001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Modifier; 030import java.net.SocketTimeoutException; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Set; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.SynchronousQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.stream.Collectors; 043import java.util.stream.IntStream; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.CallDroppedException; 046import org.apache.hadoop.hbase.CallQueueTooBigException; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseServerException; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.RegionLocations; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.Waiter; 057import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 058import org.apache.hadoop.hbase.exceptions.DeserializationException; 059import org.apache.hadoop.hbase.exceptions.RegionMovedException; 060import org.apache.hadoop.hbase.filter.Filter; 061import org.apache.hadoop.hbase.filter.FilterBase; 062import org.apache.hadoop.hbase.ipc.RpcClient; 063import org.apache.hadoop.hbase.master.HMaster; 064import org.apache.hadoop.hbase.regionserver.HRegion; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.Region; 067import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.JVMClusterUtil; 072import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 073import org.apache.hadoop.hbase.util.ReflectionUtils; 074import org.apache.hadoop.hbase.util.Threads; 075import org.junit.After; 076import org.junit.AfterClass; 077import org.junit.Assert; 078import org.junit.BeforeClass; 079import org.junit.ClassRule; 080import org.junit.Ignore; 081import org.junit.Rule; 082import org.junit.Test; 083import org.junit.experimental.categories.Category; 084import org.junit.rules.TestName; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 089import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 090import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 091import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level; 092 093/** 094 * This class is for testing HBaseConnectionManager features 095 */ 096@Category({ LargeTests.class }) 097public class TestConnectionImplementation { 098 099 @ClassRule 100 public static final HBaseClassTestRule CLASS_RULE = 101 HBaseClassTestRule.forClass(TestConnectionImplementation.class); 102 103 private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class); 104 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 105 private static final TableName TABLE_NAME = TableName.valueOf("test"); 106 private static final TableName TABLE_NAME1 = TableName.valueOf("test1"); 107 private static final TableName TABLE_NAME2 = TableName.valueOf("test2"); 108 private static final TableName TABLE_NAME3 = TableName.valueOf("test3"); 109 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 110 private static final byte[] ROW = Bytes.toBytes("bbb"); 111 private static final byte[] ROW_X = Bytes.toBytes("xxx"); 112 private static final int RPC_RETRY = 5; 113 114 @Rule 115 public TestName name = new TestName(); 116 117 @BeforeClass 118 public static void setUpBeforeClass() throws Exception { 119 ResourceLeakDetector.setLevel(Level.PARANOID); 120 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 121 // Up the handlers; this test needs more than usual. 122 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 123 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 124 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 125 TEST_UTIL.startMiniCluster(2); 126 127 } 128 129 @AfterClass 130 public static void tearDownAfterClass() throws Exception { 131 TEST_UTIL.shutdownMiniCluster(); 132 } 133 134 @After 135 public void tearDown() throws IOException { 136 TEST_UTIL.getAdmin().balancerSwitch(true, true); 137 } 138 139 @Test 140 public void testClusterConnection() throws IOException { 141 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, 142 new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d") 143 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 144 145 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 146 Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); 147 // make sure the internally created ExecutorService is the one passed 148 assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool()); 149 150 final TableName tableName = TableName.valueOf(name.getMethodName()); 151 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 152 Table table = con1.getTable(tableName, otherPool); 153 154 ExecutorService pool = null; 155 156 if (table instanceof HTable) { 157 HTable t = (HTable) table; 158 // make sure passing a pool to the getTable does not trigger creation of an internal pool 159 assertNull("Internal Thread pool should be null", 160 ((ConnectionImplementation) con1).getCurrentBatchPool()); 161 // table should use the pool passed 162 assertTrue(otherPool == t.getPool()); 163 t.close(); 164 165 t = (HTable) con2.getTable(tableName); 166 // table should use the connectin's internal pool 167 assertTrue(otherPool == t.getPool()); 168 t.close(); 169 170 t = (HTable) con2.getTable(tableName); 171 // try other API too 172 assertTrue(otherPool == t.getPool()); 173 t.close(); 174 175 t = (HTable) con2.getTable(tableName); 176 // try other API too 177 assertTrue(otherPool == t.getPool()); 178 t.close(); 179 180 t = (HTable) con1.getTable(tableName); 181 pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); 182 // make sure an internal pool was created 183 assertNotNull("An internal Thread pool should have been created", pool); 184 // and that the table is using it 185 assertTrue(t.getPool() == pool); 186 t.close(); 187 188 t = (HTable) con1.getTable(tableName); 189 // still using the *same* internal pool 190 assertTrue(t.getPool() == pool); 191 t.close(); 192 } else { 193 table.close(); 194 } 195 196 con1.close(); 197 198 // if the pool was created on demand it should be closed upon connection close 199 if (pool != null) { 200 assertTrue(pool.isShutdown()); 201 } 202 203 con2.close(); 204 // if the pool is passed, it is not closed 205 assertFalse(otherPool.isShutdown()); 206 otherPool.shutdownNow(); 207 } 208 209 /** 210 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 211 * @throws IOException Unable to construct admin 212 */ 213 @Test 214 public void testAdminFactory() throws IOException { 215 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 216 Admin admin = con1.getAdmin(); 217 assertTrue(admin.getConnection() == con1); 218 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 219 con1.close(); 220 } 221 222 // Fails too often! Needs work. HBASE-12558 223 // May only fail on non-linux machines? E.g. macosx. 224 @Ignore 225 @Test(expected = RegionServerStoppedException.class) 226 // Depends on mulitcast messaging facility that seems broken in hbase2 227 // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of 228 // dead servers is broke" 229 public void testClusterStatus() throws Exception { 230 final TableName tableName = TableName.valueOf(name.getMethodName()); 231 byte[] cf = "cf".getBytes(); 232 byte[] rk = "rk1".getBytes(); 233 234 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer(); 235 rs.waitForServerOnline(); 236 final ServerName sn = rs.getRegionServer().getServerName(); 237 238 Table t = TEST_UTIL.createTable(tableName, cf); 239 TEST_UTIL.waitTableAvailable(tableName); 240 TEST_UTIL.waitUntilNoRegionsInTransition(); 241 242 final ConnectionImplementation hci = (ConnectionImplementation) TEST_UTIL.getConnection(); 243 try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 244 while (l.getRegionLocation(rk).getPort() != sn.getPort()) { 245 TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(), 246 sn); 247 TEST_UTIL.waitUntilNoRegionsInTransition(); 248 hci.clearRegionCache(tableName); 249 } 250 Assert.assertNotNull(hci.clusterStatusListener); 251 TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000); 252 } 253 254 Put p1 = new Put(rk); 255 p1.addColumn(cf, "qual".getBytes(), "val".getBytes()); 256 t.put(p1); 257 258 rs.getRegionServer().abort("I'm dead"); 259 260 // We want the status to be updated. That's a least 10 second 261 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 262 @Override 263 public boolean evaluate() throws Exception { 264 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().getDeadServers() 265 .isDeadServer(sn); 266 } 267 }); 268 269 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 270 @Override 271 public boolean evaluate() throws Exception { 272 return hci.clusterStatusListener.isDeadServer(sn); 273 } 274 }); 275 276 t.close(); 277 hci.getClient(sn); // will throw an exception: RegionServerStoppedException 278 } 279 280 /** 281 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 282 */ 283 @Test 284 public void testConnectionCloseAllowsInterrupt() throws Exception { 285 testConnectionClose(true); 286 } 287 288 @Test 289 public void testConnectionNotAllowsInterrupt() throws Exception { 290 testConnectionClose(false); 291 } 292 293 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 294 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 295 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 296 297 TEST_UTIL.getAdmin().balancerSwitch(false, true); 298 299 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 300 // We want to work on a separate connection. 301 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 302 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 303 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 304 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 305 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 306 // to avoid the client to be stuck when do the Get 307 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 308 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 309 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 310 311 Connection connection = ConnectionFactory.createConnection(c2); 312 final Table table = connection.getTable(tableName); 313 314 Put put = new Put(ROW); 315 put.addColumn(FAM_NAM, ROW, ROW); 316 table.put(put); 317 318 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 319 final AtomicInteger step = new AtomicInteger(0); 320 321 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 322 Thread t = new Thread("testConnectionCloseThread") { 323 @Override 324 public void run() { 325 int done = 0; 326 try { 327 step.set(1); 328 while (step.get() == 1) { 329 Get get = new Get(ROW); 330 table.get(get); 331 done++; 332 if (done % 100 == 0) LOG.info("done=" + done); 333 // without the sleep, will cause the exception for too many files in 334 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 335 Thread.sleep(100); 336 } 337 } catch (Throwable t) { 338 failed.set(t); 339 LOG.error(t.toString(), t); 340 } 341 step.set(3); 342 } 343 }; 344 t.start(); 345 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 346 @Override 347 public boolean evaluate() throws Exception { 348 return step.get() == 1; 349 } 350 }); 351 352 ServerName sn; 353 try (RegionLocator rl = connection.getRegionLocator(tableName)) { 354 sn = rl.getRegionLocation(ROW).getServerName(); 355 } 356 ConnectionImplementation conn = (ConnectionImplementation) connection; 357 RpcClient rpcClient = conn.getRpcClient(); 358 359 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); 360 for (int i = 0; i < 500; i++) { 361 rpcClient.cancelConnections(sn); 362 Thread.sleep(50); 363 } 364 365 step.compareAndSet(1, 2); 366 // The test may fail here if the thread doing the gets is stuck. The way to find 367 // out what's happening is to look for the thread named 'testConnectionCloseThread' 368 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 369 @Override 370 public boolean evaluate() throws Exception { 371 return step.get() == 3; 372 } 373 }); 374 table.close(); 375 connection.close(); 376 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 377 } 378 379 /** 380 * Test that connection can become idle without breaking everything. 381 */ 382 @Test 383 public void testConnectionIdle() throws Exception { 384 final TableName tableName = TableName.valueOf(name.getMethodName()); 385 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 386 int idleTime = 20000; 387 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 388 389 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 390 // We want to work on a separate connection. 391 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 392 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 393 c2.setInt(RpcClient.IDLE_TIME, idleTime); 394 395 Connection connection = ConnectionFactory.createConnection(c2); 396 final Table table = connection.getTable(tableName); 397 398 Put put = new Put(ROW); 399 put.addColumn(FAM_NAM, ROW, ROW); 400 table.put(put); 401 402 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 403 mee.setValue(EnvironmentEdgeManager.currentTime()); 404 EnvironmentEdgeManager.injectEdge(mee); 405 LOG.info("first get"); 406 table.get(new Get(ROW)); 407 408 LOG.info("first get - changing the time & sleeping"); 409 mee.incValue(idleTime + 1000); 410 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 411 // 1500 = sleep time in RpcClient#waitForWork + a margin 412 413 LOG.info("second get - connection has been marked idle in the middle"); 414 // To check that the connection actually became idle would need to read some private 415 // fields of RpcClient. 416 table.get(new Get(ROW)); 417 mee.incValue(idleTime + 1000); 418 419 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 420 // We're testing here a special case: 421 // time limit reached BUT connection not yet reclaimed AND a new call. 422 // in this situation, we don't close the connection, instead we use it immediately. 423 // If we're very unlucky we can have a race condition in the test: the connection is already 424 // under closing when we do the get, so we have an exception, and we don't retry as the 425 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 426 // a test issue only. 427 table.get(new Get(ROW)); 428 429 LOG.info("we're done - time will change back"); 430 431 table.close(); 432 433 connection.close(); 434 EnvironmentEdgeManager.reset(); 435 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 436 } 437 438 /** 439 * Test that the connection to the dead server is cut immediately when we receive the 440 * notification. 441 */ 442 @Test 443 public void testConnectionCut() throws Exception { 444 final TableName tableName = TableName.valueOf(name.getMethodName()); 445 446 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 447 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 448 449 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 450 // We want to work on a separate connection. 451 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 452 // try only once w/o any retry 453 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 454 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); 455 456 final Connection connection = ConnectionFactory.createConnection(c2); 457 final Table table = connection.getTable(tableName); 458 459 Put p = new Put(FAM_NAM); 460 p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); 461 table.put(p); 462 463 final ConnectionImplementation hci = (ConnectionImplementation) connection; 464 465 final HRegionLocation loc; 466 try (RegionLocator rl = connection.getRegionLocator(tableName)) { 467 loc = rl.getRegionLocation(FAM_NAM); 468 } 469 470 Get get = new Get(FAM_NAM); 471 Assert.assertNotNull(table.get(get)); 472 473 get = new Get(FAM_NAM); 474 get.setFilter(new BlockingFilter()); 475 476 // This thread will mark the server as dead while we're waiting during a get. 477 Thread t = new Thread() { 478 @Override 479 public void run() { 480 synchronized (syncBlockingFilter) { 481 try { 482 syncBlockingFilter.wait(); 483 } catch (InterruptedException e) { 484 throw new RuntimeException(e); 485 } 486 } 487 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName()); 488 } 489 }; 490 491 t.start(); 492 try { 493 table.get(get); 494 Assert.fail(); 495 } catch (IOException expected) { 496 LOG.debug("Received: " + expected); 497 Assert.assertFalse(expected instanceof SocketTimeoutException); 498 Assert.assertFalse(syncBlockingFilter.get()); 499 } finally { 500 syncBlockingFilter.set(true); 501 t.join(); 502 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 503 } 504 505 table.close(); 506 connection.close(); 507 } 508 509 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false); 510 511 public static class BlockingFilter extends FilterBase { 512 @Override 513 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { 514 int i = 0; 515 while (i++ < 1000 && !syncBlockingFilter.get()) { 516 synchronized (syncBlockingFilter) { 517 syncBlockingFilter.notifyAll(); 518 } 519 Threads.sleep(100); 520 } 521 syncBlockingFilter.set(true); 522 return false; 523 } 524 525 @Override 526 public ReturnCode filterCell(final Cell ignored) throws IOException { 527 return ReturnCode.INCLUDE; 528 } 529 530 public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException { 531 return new BlockingFilter(); 532 } 533 } 534 535 /** 536 * Test that when we delete a location using the first row of a region that we really delete it. 537 */ 538 @Test 539 public void testRegionCaching() throws Exception { 540 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); 541 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 542 // test with no retry, or client cache will get updated after the first failure 543 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 544 Connection connection = ConnectionFactory.createConnection(conf); 545 final Table table = connection.getTable(TABLE_NAME); 546 547 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); 548 Put put = new Put(ROW); 549 put.addColumn(FAM_NAM, ROW, ROW); 550 table.put(put); 551 552 ConnectionImplementation conn = (ConnectionImplementation) connection; 553 554 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 555 556 // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at 557 // a location where the port is current port number +1 -- i.e. a non-existent location. 558 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 559 final int nextPort = loc.getPort() + 1; 560 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), 561 ServerName.valueOf("127.0.0.1", nextPort, HConstants.LATEST_TIMESTAMP), 562 HConstants.LATEST_TIMESTAMP); 563 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort(), 564 nextPort); 565 566 conn.clearRegionCache(TABLE_NAME, ROW.clone()); 567 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW); 568 assertNull("What is this location?? " + rl, rl); 569 570 // We're now going to move the region and check that it works for the client 571 // First a new put to add the location in the cache 572 conn.clearRegionCache(TABLE_NAME); 573 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); 574 Put put2 = new Put(ROW); 575 put2.addColumn(FAM_NAM, ROW, ROW); 576 table.put(put2); 577 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 578 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone())); 579 580 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 581 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 582 583 // We can wait for all regions to be online, that makes log reading easier when debugging 584 TEST_UTIL.waitUntilNoRegionsInTransition(); 585 586 // Now moving the region to the second server 587 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 588 byte[] regionName = toMove.getRegionInfo().getRegionName(); 589 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 590 591 // Choose the other server. 592 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 593 int destServerId = curServerId == 0 ? 1 : 0; 594 595 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 596 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 597 598 ServerName destServerName = destServer.getServerName(); 599 600 // Check that we are in the expected state 601 Assert.assertTrue(curServer != destServer); 602 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); 603 Assert.assertFalse(toMove.getPort() == destServerName.getPort()); 604 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 605 Assert.assertNull(destServer.getOnlineRegion(regionName)); 606 Assert.assertFalse( 607 TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()); 608 609 // Moving. It's possible that we don't have all the regions online at this point, so 610 // the test must depend only on the region we're looking at. 611 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 612 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 613 614 while ( 615 destServer.getOnlineRegion(regionName) == null 616 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 617 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 618 || master.getAssignmentManager().hasRegionsInTransition() 619 ) { 620 // wait for the move to be finished 621 Thread.sleep(1); 622 } 623 624 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 625 626 // Check our new state. 627 Assert.assertNull(curServer.getOnlineRegion(regionName)); 628 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 629 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 630 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 631 632 // Cache was NOT updated and points to the wrong server 633 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() 634 == destServerName.getPort()); 635 636 // This part relies on a number of tries equals to 1. 637 // We do a put and expect the cache to be updated, even if we don't retry 638 LOG.info("Put starting"); 639 Put put3 = new Put(ROW); 640 put3.addColumn(FAM_NAM, ROW, ROW); 641 try { 642 table.put(put3); 643 Assert.fail("Unreachable point"); 644 } catch (RetriesExhaustedWithDetailsException e) { 645 LOG.info("Put done, exception caught: " + e.getClass()); 646 Assert.assertEquals(1, e.getNumExceptions()); 647 Assert.assertEquals(1, e.getCauses().size()); 648 Assert.assertArrayEquals(ROW, e.getRow(0).getRow()); 649 650 // Check that we unserialized the exception as expected 651 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); 652 Assert.assertNotNull(cause); 653 Assert.assertTrue(cause instanceof RegionMovedException); 654 } catch (RetriesExhaustedException ree) { 655 // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException 656 // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue. 657 LOG.info("Put done, exception caught: " + ree.getClass()); 658 Throwable cause = ClientExceptionsUtil.findException(ree.getCause()); 659 Assert.assertNotNull(cause); 660 Assert.assertTrue(cause instanceof RegionMovedException); 661 } 662 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW)); 663 Assert.assertEquals("Previous server was " + curServer.getServerName().getAddress(), 664 destServerName.getPort(), 665 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 666 667 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 668 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 669 670 // We move it back to do another test with a scan 671 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 672 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), 673 curServer.getServerName()); 674 675 while ( 676 curServer.getOnlineRegion(regionName) == null 677 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 678 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 679 || master.getAssignmentManager().hasRegionsInTransition() 680 ) { 681 // wait for the move to be finished 682 Thread.sleep(1); 683 } 684 685 // Check our new state. 686 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 687 Assert.assertNull(destServer.getOnlineRegion(regionName)); 688 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 689 690 // Cache was NOT updated and points to the wrong server 691 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() 692 == curServer.getServerName().getPort()); 693 694 Scan sc = new Scan(); 695 sc.setStopRow(ROW); 696 sc.setStartRow(ROW); 697 698 // The scanner takes the max retries from the connection configuration, not the table as 699 // the put. 700 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 701 702 try { 703 ResultScanner rs = table.getScanner(sc); 704 while (rs.next() != null) { 705 } 706 Assert.fail("Unreachable point"); 707 } catch (RetriesExhaustedException e) { 708 LOG.info("Scan done, expected exception caught: " + e.getClass()); 709 } 710 711 // Cache is updated with the right value. 712 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 713 Assert.assertEquals("Previous server was " + destServer.getServerName().getAddress(), 714 curServer.getServerName().getPort(), 715 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 716 717 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 718 table.close(); 719 connection.close(); 720 } 721 722 /** 723 * Test that Connection or Pool are not closed when managed externally 724 */ 725 @Test 726 public void testConnectionManagement() throws Exception { 727 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); 728 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 729 Table table = conn.getTable(TABLE_NAME1); 730 table.close(); 731 assertFalse(conn.isClosed()); 732 if (table instanceof HTable) { 733 assertFalse(((HTable) table).getPool().isShutdown()); 734 } 735 table = conn.getTable(TABLE_NAME1); 736 table.close(); 737 if (table instanceof HTable) { 738 assertFalse(((HTable) table).getPool().isShutdown()); 739 } 740 conn.close(); 741 if (table instanceof HTable) { 742 assertTrue(((HTable) table).getPool().isShutdown()); 743 } 744 table0.close(); 745 } 746 747 /** 748 * Test that stale cache updates don't override newer cached values. 749 */ 750 @Test 751 public void testCacheSeqNums() throws Exception { 752 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM); 753 Put put = new Put(ROW); 754 put.addColumn(FAM_NAM, ROW, ROW); 755 table.put(put); 756 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 757 758 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 759 assertNotNull(location); 760 761 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); 762 763 // Same server as already in cache reporting - overwrites any value despite seqNum. 764 int nextPort = location.getPort() + 1; 765 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 766 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 767 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 768 Assert.assertEquals(nextPort, location.getPort()); 769 770 // No source specified - same. 771 nextPort = location.getPort() + 1; 772 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 773 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 774 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 775 Assert.assertEquals(nextPort, location.getPort()); 776 777 // Higher seqNum - overwrites lower seqNum. 778 nextPort = location.getPort() + 1; 779 conn.updateCachedLocation(location.getRegionInfo(), anySource, 780 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1); 781 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 782 Assert.assertEquals(nextPort, location.getPort()); 783 784 // Lower seqNum - does not overwrite higher seqNum. 785 nextPort = location.getPort() + 1; 786 conn.updateCachedLocation(location.getRegionInfo(), anySource, 787 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 788 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 789 Assert.assertEquals(nextPort - 1, location.getPort()); 790 table.close(); 791 } 792 793 @Test 794 public void testClosing() throws Exception { 795 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); 796 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 797 String.valueOf(ThreadLocalRandom.current().nextInt())); 798 799 // as connection caching is going away, now we're just testing 800 // that closed connection does actually get closed. 801 802 Connection c1 = ConnectionFactory.createConnection(configuration); 803 Connection c2 = ConnectionFactory.createConnection(configuration); 804 // no caching, different connections 805 assertTrue(c1 != c2); 806 807 // closing independently 808 c1.close(); 809 assertTrue(c1.isClosed()); 810 assertFalse(c2.isClosed()); 811 812 c2.close(); 813 assertTrue(c2.isClosed()); 814 } 815 816 /** 817 * Trivial test to verify that nobody messes with 818 * {@link ConnectionFactory#createConnection(Configuration)} 819 */ 820 @Test 821 public void testCreateConnection() throws Exception { 822 Configuration configuration = TEST_UTIL.getConfiguration(); 823 Connection c1 = ConnectionFactory.createConnection(configuration); 824 Connection c2 = ConnectionFactory.createConnection(configuration); 825 // created from the same configuration, yet they are different 826 assertTrue(c1 != c2); 827 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 828 } 829 830 /** 831 * This test checks that one can connect to the cluster with only the ZooKeeper quorum set. Other 832 * stuff like master address will be read from ZK by the client. 833 */ 834 @Test 835 public void testConnection() throws Exception { 836 // We create an empty config and add the ZK address. 837 Configuration c = new Configuration(); 838 // This test only makes sense for ZK based connection registry. 839 c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 840 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 841 c.set(HConstants.ZOOKEEPER_QUORUM, 842 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); 843 c.set(HConstants.ZOOKEEPER_CLIENT_PORT, 844 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); 845 // This should be enough to connect 846 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c); 847 assertTrue(conn.isMasterRunning()); 848 conn.close(); 849 } 850 851 private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception { 852 Field numTries = hci.getClass().getDeclaredField("numTries"); 853 numTries.setAccessible(true); 854 Field modifiersField = ReflectionUtils.getModifiersField(); 855 modifiersField.setAccessible(true); 856 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); 857 final int prevNumRetriesVal = (Integer) numTries.get(hci); 858 numTries.set(hci, newVal); 859 860 return prevNumRetriesVal; 861 } 862 863 @Test 864 public void testMulti() throws Exception { 865 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM); 866 try { 867 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 868 869 // We're now going to move the region and check that it works for the client 870 // First a new put to add the location in the cache 871 conn.clearRegionCache(TABLE_NAME3); 872 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); 873 874 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 875 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 876 877 // We can wait for all regions to be online, that makes log reading easier when debugging 878 TEST_UTIL.waitUntilNoRegionsInTransition(); 879 880 Put put = new Put(ROW_X); 881 put.addColumn(FAM_NAM, ROW_X, ROW_X); 882 table.put(put); 883 884 // Now moving the region to the second server 885 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); 886 byte[] regionName = toMove.getRegionInfo().getRegionName(); 887 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 888 889 // Choose the other server. 890 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 891 int destServerId = (curServerId == 0 ? 1 : 0); 892 893 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 894 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 895 896 ServerName destServerName = destServer.getServerName(); 897 ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 898 899 // find another row in the cur server that is less than ROW_X 900 List<HRegion> regions = curServer.getRegions(TABLE_NAME3); 901 byte[] otherRow = null; 902 for (Region region : regions) { 903 if ( 904 !region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) 905 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0 906 ) { 907 otherRow = region.getRegionInfo().getStartKey(); 908 break; 909 } 910 } 911 assertNotNull(otherRow); 912 // If empty row, set it to first row.-f 913 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); 914 Put put2 = new Put(otherRow); 915 put2.addColumn(FAM_NAM, otherRow, otherRow); 916 table.put(put2); // cache put2's location 917 918 // Check that we are in the expected state 919 Assert.assertTrue(curServer != destServer); 920 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); 921 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); 922 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 923 Assert.assertNull(destServer.getOnlineRegion(regionName)); 924 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager() 925 .hasRegionsInTransition()); 926 927 // Moving. It's possible that we don't have all the regions online at this point, so 928 // the test depends only on the region we're looking at. 929 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 930 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 931 932 while ( 933 destServer.getOnlineRegion(regionName) == null 934 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 935 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 936 || master.getAssignmentManager().hasRegionsInTransition() 937 ) { 938 // wait for the move to be finished 939 Thread.sleep(1); 940 } 941 942 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 943 944 // Check our new state. 945 Assert.assertNull(curServer.getOnlineRegion(regionName)); 946 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 947 Assert 948 .assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 949 Assert 950 .assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 951 952 // Cache was NOT updated and points to the wrong server 953 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation().getPort() 954 == destServerName.getPort()); 955 956 // Hijack the number of retry to fail after 2 tries 957 final int prevNumRetriesVal = setNumTries(conn, 2); 958 959 Put put3 = new Put(ROW_X); 960 put3.addColumn(FAM_NAM, ROW_X, ROW_X); 961 Put put4 = new Put(otherRow); 962 put4.addColumn(FAM_NAM, otherRow, otherRow); 963 964 // do multi 965 ArrayList<Put> actions = Lists.newArrayList(put4, put3); 966 table.batch(actions, null); // first should be a valid row, 967 // second we get RegionMovedException. 968 969 setNumTries(conn, prevNumRetriesVal); 970 } finally { 971 table.close(); 972 } 973 } 974 975 @Test 976 public void testErrorBackoffTimeCalculation() throws Exception { 977 // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. 978 final long ANY_PAUSE = 100; 979 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); 980 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); 981 982 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); 983 EnvironmentEdgeManager.injectEdge(timeMachine); 984 try { 985 long largeAmountOfTime = ANY_PAUSE * 1000; 986 ConnectionImplementation.ServerErrorTracker tracker = 987 new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); 988 989 // The default backoff is 0. 990 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); 991 992 // Check some backoff values from HConstants sequence. 993 tracker.reportServerError(location); 994 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 995 tracker.calculateBackoffTime(location, ANY_PAUSE)); 996 tracker.reportServerError(location); 997 tracker.reportServerError(location); 998 tracker.reportServerError(location); 999 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3], 1000 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1001 1002 // All of this shouldn't affect backoff for different location. 1003 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1004 tracker.reportServerError(diffLocation); 1005 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1006 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1007 1008 // Check with different base. 1009 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3], 1010 tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); 1011 } finally { 1012 EnvironmentEdgeManager.reset(); 1013 } 1014 } 1015 1016 private static void assertEqualsWithJitter(long expected, long actual) { 1017 assertEqualsWithJitter(expected, actual, expected); 1018 } 1019 1020 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { 1021 assertTrue("Value not within jitter: " + expected + " vs " + actual, 1022 Math.abs(actual - expected) <= (0.01f * jitterBase)); 1023 } 1024 1025 @Test 1026 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException { 1027 Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 1028 // This test only makes sense for ZK based connection registry. 1029 config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 1030 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 1031 1032 final TableName tableName = TableName.valueOf(name.getMethodName()); 1033 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close(); 1034 1035 Connection connection = ConnectionFactory.createConnection(config); 1036 Table table = connection.getTable(tableName); 1037 1038 // this will cache the meta location and table's region location 1039 table.get(new Get(Bytes.toBytes("foo"))); 1040 1041 // restart HBase 1042 TEST_UTIL.shutdownMiniHBaseCluster(); 1043 TEST_UTIL.restartHBaseCluster(2); 1044 // this should be able to discover new locations for meta and table's region 1045 table.get(new Get(Bytes.toBytes("foo"))); 1046 TEST_UTIL.deleteTable(tableName); 1047 table.close(); 1048 connection.close(); 1049 } 1050 1051 @Test 1052 public void testLocateRegionsWithRegionReplicas() throws IOException { 1053 int regionReplication = 3; 1054 byte[] family = Bytes.toBytes("cf"); 1055 TableName tableName = TableName.valueOf(name.getMethodName()); 1056 1057 // Create a table with region replicas 1058 TableDescriptorBuilder builder = 1059 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication) 1060 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1061 TEST_UTIL.getAdmin().createTable(builder.build()); 1062 1063 try (ConnectionImplementation con = 1064 (ConnectionImplementation) ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { 1065 // Get locations of the regions of the table 1066 List<HRegionLocation> locations = con.locateRegions(tableName, false, false); 1067 1068 // The size of the returned locations should be 3 1069 assertEquals(regionReplication, locations.size()); 1070 1071 // The replicaIds of the returned locations should be 0, 1 and 2 1072 Set<Integer> expectedReplicaIds = 1073 IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet()); 1074 for (HRegionLocation location : locations) { 1075 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 1076 } 1077 } finally { 1078 TEST_UTIL.deleteTable(tableName); 1079 } 1080 } 1081 1082 @Test 1083 public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException { 1084 testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class); 1085 } 1086 1087 @Test 1088 public void testLocateRegionsRetrySpecialPauseCDE() throws IOException { 1089 testLocateRegionsRetrySpecialPause(CallDroppedException.class); 1090 } 1091 1092 private void testLocateRegionsRetrySpecialPause( 1093 Class<? extends HBaseServerException> exceptionClass) throws IOException { 1094 1095 int regionReplication = 3; 1096 byte[] family = Bytes.toBytes("cf"); 1097 TableName tableName = TableName.valueOf(name.getMethodName()); 1098 1099 // Create a table with region replicas 1100 TableDescriptorBuilder builder = 1101 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication) 1102 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1103 TEST_UTIL.getAdmin().createTable(builder.build()); 1104 1105 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 1106 1107 conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, ThrowingCallerFactory.class, 1108 RpcRetryingCallerFactory.class); 1109 conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class); 1110 1111 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 1112 // normal pause very short, 10 millis 1113 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10); 1114 1115 // special pause 10x longer, so we can detect it 1116 long specialPauseMillis = 1000; 1117 conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 1118 specialPauseMillis); 1119 1120 try (ConnectionImplementation con = 1121 (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { 1122 // Get locations of the regions of the table 1123 1124 long start = System.nanoTime(); 1125 try { 1126 con.locateRegion(tableName, new byte[0], false, true, 0); 1127 } catch (HBaseServerException e) { 1128 assertTrue(e.isServerOverloaded()); 1129 // pass: expected 1130 } 1131 assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis)); 1132 } finally { 1133 TEST_UTIL.deleteTable(tableName); 1134 } 1135 } 1136 1137 private static class ThrowingCallerFactory extends RpcRetryingCallerFactory { 1138 1139 private final Class<? extends HBaseServerException> exceptionClass; 1140 1141 public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) { 1142 super(conf, connectionConfig); 1143 this.exceptionClass = 1144 conf.getClass("testSpecialPauseException", null, HBaseServerException.class); 1145 } 1146 1147 @Override 1148 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 1149 return newCaller(); 1150 } 1151 1152 @Override 1153 public <T> RpcRetryingCaller<T> newCaller() { 1154 return new RpcRetryingCaller<T>() { 1155 @Override 1156 public void cancel() { 1157 1158 } 1159 1160 @Override 1161 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 1162 throws IOException, RuntimeException { 1163 return callWithoutRetries(null, 0); 1164 } 1165 1166 @Override 1167 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 1168 throws IOException, RuntimeException { 1169 try { 1170 throw exceptionClass.getConstructor().newInstance(); 1171 } catch (IllegalAccessException | InstantiationException | InvocationTargetException 1172 | NoSuchMethodException e) { 1173 throw new RuntimeException(e); 1174 } 1175 } 1176 }; 1177 } 1178 } 1179 1180 @Test 1181 public void testMetaLookupThreadPoolCreated() throws Exception { 1182 final TableName tableName = TableName.valueOf(name.getMethodName()); 1183 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; 1184 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 1185 TEST_UTIL.getAdmin().disableTable(tableName); 1186 TEST_UTIL.getAdmin().deleteTable(tableName); 1187 } 1188 try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) { 1189 byte[] row = Bytes.toBytes("test"); 1190 ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); 1191 // check that metalookup pool would get created 1192 c.relocateRegion(tableName, row); 1193 ExecutorService ex = c.getCurrentMetaLookupPool(); 1194 assertNotNull(ex); 1195 } 1196 } 1197 1198 // There is no assertion, but you need to confirm that there is no resource leak output from netty 1199 @Test 1200 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException { 1201 TableName tableName = TableName.valueOf(name.getMethodName()); 1202 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 1203 TEST_UTIL.getAdmin().balancerSwitch(false, true); 1204 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 1205 Table table = connection.getTable(tableName)) { 1206 table.get(new Get(Bytes.toBytes("1"))); 1207 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName(); 1208 RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient(); 1209 rpcClient.cancelConnections(sn); 1210 Thread.sleep(1000); 1211 System.gc(); 1212 Thread.sleep(1000); 1213 } 1214 } 1215}