001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.net.SocketTimeoutException; 026import java.util.Map; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 036import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.junit.AfterClass; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057@Category({ MediumTests.class, ClientTests.class }) 058public class TestClientTimeouts { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestClientTimeouts.class); 063 064 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 065 protected static int SLAVES = 1; 066 067 /** 068 * @throws java.lang.Exception 069 */ 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 TEST_UTIL.startMiniCluster(SLAVES); 073 } 074 075 /** 076 * @throws java.lang.Exception 077 */ 078 @AfterClass 079 public static void tearDownAfterClass() throws Exception { 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 083 private Connection createConnection() { 084 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 085 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 086 // Set the custom RPC client with random timeouts as the client 087 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 088 RandomTimeoutRpcClient.class.getName()); 089 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 090 for (;;) { 091 try { 092 return ConnectionFactory.createConnection(conf); 093 } catch (IOException e) { 094 // since we randomly throw SocketTimeoutException, it is possible that we fail when creating 095 // the Connection, but this is not what we want to test here, so just ignore it and try 096 // again 097 } 098 } 099 } 100 101 /** 102 * Test that a client that fails an RPC to the master retries properly and doesn't throw any 103 * unexpected exceptions. 104 */ 105 @Test 106 public void testAdminTimeout() throws Exception { 107 try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) { 108 int initialInvocations = invokations.get(); 109 boolean balanceEnabled = admin.isBalancerEnabled(); 110 for (int i = 0; i < 5; i++) { 111 assertEquals(balanceEnabled, admin.balancerSwitch(!balanceEnabled, false)); 112 balanceEnabled = !balanceEnabled; 113 } 114 // Ensure the RandomTimeoutRpcEngine is actually being used. 115 assertTrue(invokations.get() > initialInvocations); 116 } 117 } 118 119 /** 120 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 121 */ 122 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 123 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 124 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) { 125 super(conf, clusterId, localAddr, metrics, connectionAttributes); 126 } 127 128 // Return my own instance, one that does random timeouts 129 @Override 130 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 131 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 132 } 133 134 @Override 135 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 136 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 137 } 138 } 139 140 private static final double CHANCE_OF_TIMEOUT = 0.3; 141 private static AtomicInteger invokations = new AtomicInteger(); 142 143 /** 144 * Blocking rpc channel that goes via hbase rpc. 145 */ 146 static class RandomTimeoutBlockingRpcChannel 147 extends AbstractRpcClient.BlockingRpcChannelImplementation { 148 149 RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, 150 final User ticket, final int rpcTimeout) { 151 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 152 } 153 154 @Override 155 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, 156 Message returnType) throws ServiceException { 157 invokations.getAndIncrement(); 158 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 159 // throw a ServiceException, becuase that is the only exception type that 160 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 161 // "actual" type, this may not properly mimic the underlying RpcEngine. 162 throw new ServiceException(new SocketTimeoutException("fake timeout")); 163 } 164 return super.callBlockingMethod(md, controller, param, returnType); 165 } 166 } 167 168 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 169 170 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 171 int rpcTimeout) { 172 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 173 } 174 175 @Override 176 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 177 Message returnType, RpcCallback<Message> done) { 178 invokations.getAndIncrement(); 179 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 180 // throw a ServiceException, because that is the only exception type that 181 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 182 // "actual" type, this may not properly mimic the underlying RpcEngine. 183 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 184 done.run(null); 185 return; 186 } 187 super.callMethod(md, controller, param, returnType, done); 188 } 189 } 190}