001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.AtomicReference; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.coprocessor.Batch; 042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 043import org.apache.hadoop.hbase.ipc.RpcClient; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 048import org.junit.After; 049import org.junit.AfterClass; 050import org.junit.Assert; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 061import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 062import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level; 063 064import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 066 067/** 068 * This class is for testing {@link Connection}. 069 */ 070@Category({ LargeTests.class }) 071public class TestConnection { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestConnection.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestConnection.class); 078 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 080 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 081 private static final byte[] ROW = Bytes.toBytes("bbb"); 082 private static final int RPC_RETRY = 5; 083 084 @Rule 085 public TestName name = new TestName(); 086 087 @BeforeClass 088 public static void setUpBeforeClass() throws Exception { 089 ResourceLeakDetector.setLevel(Level.PARANOID); 090 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 091 // Up the handlers; this test needs more than usual. 092 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 093 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 094 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 095 TEST_UTIL.startMiniCluster(2); 096 097 } 098 099 @AfterClass 100 public static void tearDownAfterClass() throws Exception { 101 TEST_UTIL.shutdownMiniCluster(); 102 } 103 104 @After 105 public void tearDown() throws IOException { 106 TEST_UTIL.getAdmin().balancerSwitch(true, true); 107 } 108 109 /** 110 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 111 * @throws IOException Unable to construct admin 112 */ 113 @Test 114 public void testAdminFactory() throws IOException { 115 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 116 Admin admin = con1.getAdmin(); 117 assertTrue(admin.getConnection() == con1); 118 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 119 con1.close(); 120 } 121 122 /** 123 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 124 */ 125 @Test 126 public void testConnectionCloseAllowsInterrupt() throws Exception { 127 testConnectionClose(true); 128 } 129 130 @Test 131 public void testConnectionNotAllowsInterrupt() throws Exception { 132 testConnectionClose(false); 133 } 134 135 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 136 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 137 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 138 139 TEST_UTIL.getAdmin().balancerSwitch(false, true); 140 141 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 142 // We want to work on a separate connection. 143 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 144 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 145 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 146 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 147 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 148 // to avoid the client to be stuck when do the Get 149 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 150 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 151 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 152 153 Connection connection = ConnectionFactory.createConnection(c2); 154 final Table table = connection.getTable(tableName); 155 156 Put put = new Put(ROW); 157 put.addColumn(FAM_NAM, ROW, ROW); 158 table.put(put); 159 160 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 161 final AtomicInteger step = new AtomicInteger(0); 162 163 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 164 Thread t = new Thread("testConnectionCloseThread") { 165 @Override 166 public void run() { 167 int done = 0; 168 try { 169 step.set(1); 170 while (step.get() == 1) { 171 Get get = new Get(ROW); 172 table.get(get); 173 done++; 174 if (done % 100 == 0) { 175 LOG.info("done=" + done); 176 } 177 // without the sleep, will cause the exception for too many files in 178 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 179 Thread.sleep(100); 180 } 181 } catch (Throwable t) { 182 failed.set(t); 183 LOG.error(t.toString(), t); 184 } 185 step.set(3); 186 } 187 }; 188 t.start(); 189 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 190 @Override 191 public boolean evaluate() throws Exception { 192 return step.get() == 1; 193 } 194 }); 195 196 ServerName sn; 197 try (RegionLocator rl = connection.getRegionLocator(tableName)) { 198 sn = rl.getRegionLocation(ROW).getServerName(); 199 } 200 201 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient; 202 203 LOG.info("Going to cancel connections. connection=" + connection.toString() + ", sn=" + sn); 204 for (int i = 0; i < 500; i++) { 205 rpcClient.cancelConnections(sn); 206 Thread.sleep(50); 207 } 208 209 step.compareAndSet(1, 2); 210 // The test may fail here if the thread doing the gets is stuck. The way to find 211 // out what's happening is to look for the thread named 'testConnectionCloseThread' 212 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 213 @Override 214 public boolean evaluate() throws Exception { 215 return step.get() == 3; 216 } 217 }); 218 table.close(); 219 connection.close(); 220 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 221 } 222 223 /** 224 * Test that connection can become idle without breaking everything. 225 */ 226 @Test 227 public void testConnectionIdle() throws Exception { 228 final TableName tableName = TableName.valueOf(name.getMethodName()); 229 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 230 int idleTime = 20000; 231 boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true); 232 233 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 234 // We want to work on a separate connection. 235 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 236 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 237 c2.setInt(RpcClient.IDLE_TIME, idleTime); 238 239 Connection connection = ConnectionFactory.createConnection(c2); 240 final Table table = connection.getTable(tableName); 241 242 Put put = new Put(ROW); 243 put.addColumn(FAM_NAM, ROW, ROW); 244 table.put(put); 245 246 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 247 mee.setValue(EnvironmentEdgeManager.currentTime()); 248 EnvironmentEdgeManager.injectEdge(mee); 249 LOG.info("first get"); 250 table.get(new Get(ROW)); 251 252 LOG.info("first get - changing the time & sleeping"); 253 mee.incValue(idleTime + 1000); 254 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 255 // 1500 = sleep time in RpcClient#waitForWork + a margin 256 257 LOG.info("second get - connection has been marked idle in the middle"); 258 // To check that the connection actually became idle would need to read some private 259 // fields of RpcClient. 260 table.get(new Get(ROW)); 261 mee.incValue(idleTime + 1000); 262 263 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 264 // We're testing here a special case: 265 // time limit reached BUT connection not yet reclaimed AND a new call. 266 // in this situation, we don't close the connection, instead we use it immediately. 267 // If we're very unlucky we can have a race condition in the test: the connection is already 268 // under closing when we do the get, so we have an exception, and we don't retry as the 269 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 270 // a test issue only. 271 table.get(new Get(ROW)); 272 273 LOG.info("we're done - time will change back"); 274 275 table.close(); 276 277 connection.close(); 278 EnvironmentEdgeManager.reset(); 279 TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true); 280 } 281 282 @Test 283 public void testClosing() throws Exception { 284 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); 285 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 286 String.valueOf(ThreadLocalRandom.current().nextInt())); 287 288 // as connection caching is going away, now we're just testing 289 // that closed connection does actually get closed. 290 291 Connection c1 = ConnectionFactory.createConnection(configuration); 292 Connection c2 = ConnectionFactory.createConnection(configuration); 293 // no caching, different connections 294 assertTrue(c1 != c2); 295 296 // closing independently 297 c1.close(); 298 assertTrue(c1.isClosed()); 299 assertFalse(c2.isClosed()); 300 301 c2.close(); 302 assertTrue(c2.isClosed()); 303 } 304 305 /** 306 * Trivial test to verify that nobody messes with 307 * {@link ConnectionFactory#createConnection(Configuration)} 308 */ 309 @Test 310 public void testCreateConnection() throws Exception { 311 Configuration configuration = TEST_UTIL.getConfiguration(); 312 Connection c1 = ConnectionFactory.createConnection(configuration); 313 Connection c2 = ConnectionFactory.createConnection(configuration); 314 // created from the same configuration, yet they are different 315 assertTrue(c1 != c2); 316 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 317 } 318 319 /* 320 * ====> With MasterRegistry, connections cannot outlast the masters' lifetime. 321 * @Test public void testConnectionRideOverClusterRestart() throws IOException, 322 * InterruptedException { Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 323 * final TableName tableName = TableName.valueOf(name.getMethodName()); 324 * TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close(); Connection connection = 325 * ConnectionFactory.createConnection(config); Table table = connection.getTable(tableName); // 326 * this will cache the meta location and table's region location table.get(new 327 * Get(Bytes.toBytes("foo"))); // restart HBase TEST_UTIL.shutdownMiniHBaseCluster(); 328 * TEST_UTIL.restartHBaseCluster(2); // this should be able to discover new locations for meta and 329 * table's region table.get(new Get(Bytes.toBytes("foo"))); TEST_UTIL.deleteTable(tableName); 330 * table.close(); connection.close(); } 331 */ 332 333 @Test 334 public void testLocateRegionsWithRegionReplicas() throws IOException { 335 int regionReplication = 3; 336 byte[] family = Bytes.toBytes("cf"); 337 TableName tableName = TableName.valueOf(name.getMethodName()); 338 339 // Create a table with region replicas 340 TableDescriptorBuilder builder = 341 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication) 342 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 343 TEST_UTIL.getAdmin().createTable(builder.build()); 344 345 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 346 RegionLocator locator = conn.getRegionLocator(tableName)) { 347 // Get locations of the regions of the table 348 List<HRegionLocation> locations = locator.getAllRegionLocations(); 349 350 // The size of the returned locations should be 3 351 assertEquals(regionReplication, locations.size()); 352 353 // The replicaIds of the returned locations should be 0, 1 and 2 354 Set<Integer> expectedReplicaIds = 355 IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet()); 356 for (HRegionLocation location : locations) { 357 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 358 } 359 } finally { 360 TEST_UTIL.deleteTable(tableName); 361 } 362 } 363 364 @Test(expected = DoNotRetryIOException.class) 365 public void testClosedConnection() throws ServiceException, Throwable { 366 byte[] family = Bytes.toBytes("cf"); 367 TableName tableName = TableName.valueOf(name.getMethodName()); 368 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 369 .setCoprocessor(MultiRowMutationEndpoint.class.getName()) 370 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 371 TEST_UTIL.getAdmin().createTable(builder.build()); 372 373 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 374 // cache the location 375 try (Table table = conn.getTable(tableName)) { 376 table.get(new Get(Bytes.toBytes(0))); 377 } finally { 378 conn.close(); 379 } 380 Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> { 381 throw new RuntimeException("Should not arrive here"); 382 }; 383 conn.getTable(tableName).coprocessorService(MultiRowMutationService.class, 384 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable); 385 } 386 387 // There is no assertion, but you need to confirm that there is no resource leak output from netty 388 @Test 389 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException { 390 TableName tableName = TableName.valueOf(name.getMethodName()); 391 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 392 TEST_UTIL.getAdmin().balancerSwitch(false, true); 393 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 394 Table table = connection.getTable(tableName)) { 395 table.get(new Get(Bytes.toBytes("1"))); 396 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName(); 397 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient; 398 rpcClient.cancelConnections(sn); 399 Thread.sleep(1000); 400 System.gc(); 401 Thread.sleep(1000); 402 } 403 } 404}