001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItem;
022import static org.hamcrest.Matchers.hasItems;
023import static org.hamcrest.Matchers.hasSize;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertThrows;
026
027import java.io.IOException;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Set;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.master.HMaster;
040import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager;
041import org.apache.hadoop.hbase.regionserver.RSRpcServices;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052
053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
054import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
055
056@Category({ MediumTests.class, ClientTests.class })
057public class TestRpcConnectionRegistry {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestRpcConnectionRegistry.class);
062
063  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
064
065  private RpcConnectionRegistry registry;
066
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    // allow refresh immediately so we will switch to use region servers soon.
070    UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
071    UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
072    UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
073    UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
074    UTIL.startMiniCluster(3);
075    HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
076  }
077
078  @AfterClass
079  public static void tearDownAfterClass() throws Exception {
080    UTIL.shutdownMiniCluster();
081  }
082
083  @Before
084  public void setUp() throws IOException {
085    registry = new RpcConnectionRegistry(UTIL.getConfiguration(), User.getCurrent());
086  }
087
088  @After
089  public void tearDown() throws IOException {
090    Closeables.close(registry, true);
091  }
092
093  private void setMaxNodeCount(int count) {
094    UTIL.getMiniHBaseCluster().getMasterThreads().stream()
095      .map(t -> t.getMaster().getConfiguration())
096      .forEach(conf -> conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, count));
097    UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
098      .map(t -> t.getRegionServer().getConfiguration())
099      .forEach(conf -> conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, count));
100  }
101
102  @Test
103  public void testRegistryRPCs() throws Exception {
104    HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
105    // should only contains the active master
106    Set<ServerName> initialParsedServers = registry.getParsedServers();
107    assertThat(initialParsedServers, hasSize(1));
108    // no start code in configuration
109    assertThat(initialParsedServers,
110      hasItem(ServerName.valueOf(activeMaster.getServerName().getHostname(),
111        activeMaster.getServerName().getPort(), -1)));
112    // Since our initial delay is 1 second, finally we should have refreshed the endpoints
113    UTIL.waitFor(5000, () -> registry.getParsedServers()
114      .contains(activeMaster.getServerManager().getOnlineServersList().get(0)));
115    Set<ServerName> parsedServers = registry.getParsedServers();
116    assertThat(parsedServers,
117      hasSize(activeMaster.getServerManager().getOnlineServersList().size()));
118    assertThat(parsedServers,
119      hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
120
121    // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
122    // because not all replicas had made it up before test started.
123    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
124
125    assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
126    assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
127    List<HRegionLocation> metaLocations =
128      Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
129    List<HRegionLocation> actualMetaLocations =
130      activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
131    Collections.sort(metaLocations);
132    Collections.sort(actualMetaLocations);
133    assertEquals(actualMetaLocations, metaLocations);
134
135    // test that the node count config works
136    setMaxNodeCount(1);
137    UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 1);
138  }
139
140  /**
141   * Make sure that we can create the RpcClient when there are broken servers in the bootstrap nodes
142   */
143  @Test
144  public void testBrokenBootstrapNodes() throws Exception {
145    Configuration conf = new Configuration(UTIL.getConfiguration());
146    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
147    HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
148    String clusterId = activeMaster.getClusterId();
149    // Add a non-working master
150    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
151    conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, badServer.toShortString());
152    // only a bad server, the request should fail
153    try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) {
154      assertThrows(IOException.class, () -> reg.getParsedServers());
155    }
156
157    conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES,
158      badServer.toShortString() + ", " + currentMasterAddrs);
159    // we will choose bootstrap node randomly so here we need to test it multiple times to make sure
160    // that we can skip the broken node
161    for (int i = 0; i < 10; i++) {
162      try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) {
163        assertEquals(clusterId, reg.getClusterId().get());
164      }
165    }
166  }
167}