001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static junit.framework.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import java.util.stream.Collectors; 032import java.util.stream.IntStream; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CallQueueTooBigException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.MultiActionResultTooLarge; 041import org.apache.hadoop.hbase.NotServingRegionException; 042import org.apache.hadoop.hbase.RegionTooBusyException; 043import org.apache.hadoop.hbase.RetryImmediatelyException; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 046import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 047import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RSRpcServices; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.AfterClass; 054import org.junit.Assert; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.function.ThrowingRunnable; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 065 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 069 070@Category({ MediumTests.class, ClientTests.class }) 071public class TestMetaCache { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestMetaCache.class); 076 077 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 079 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 080 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 081 private static HRegionServer badRS; 082 private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class); 083 084 /** 085 * @throws java.lang.Exception 086 */ 087 @BeforeClass 088 public static void setUpBeforeClass() throws Exception { 089 Configuration conf = TEST_UTIL.getConfiguration(); 090 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName()); 091 TEST_UTIL.startMiniCluster(1); 092 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 093 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME); 094 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 095 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 096 HTableDescriptor table = new HTableDescriptor(TABLE_NAME); 097 HColumnDescriptor fam = new HColumnDescriptor(FAMILY); 098 fam.setMaxVersions(2); 099 table.addFamily(fam); 100 TEST_UTIL.createTable(table, null); 101 } 102 103 /** 104 * @throws java.lang.Exception 105 */ 106 @AfterClass 107 public static void tearDownAfterClass() throws Exception { 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 @Test 112 public void testMergeEmptyWithMetaCache() throws Throwable { 113 TableName tableName = TableName.valueOf("MergeEmpty"); 114 byte[] family = Bytes.toBytes("CF"); 115 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 116 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 117 TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) }); 118 TEST_UTIL.waitTableAvailable(tableName); 119 TEST_UTIL.waitUntilNoRegionsInTransition(); 120 RegionInfo regionA = null; 121 RegionInfo regionB = null; 122 RegionInfo regionC = null; 123 for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) { 124 if (region.getStartKey().length == 0) { 125 regionA = region; 126 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) { 127 regionB = region; 128 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) { 129 regionC = region; 130 } 131 } 132 133 assertNotNull(regionA); 134 assertNotNull(regionB); 135 assertNotNull(regionC); 136 137 TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, 138 true); 139 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 140 AsyncConnection asyncConn = 141 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 142 ConnectionImplementation connImpl = (ConnectionImplementation) conn; 143 AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn; 144 145 MetricsConnection metrics = connImpl.getConnectionMetrics(); 146 MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get(); 147 148 // warm meta cache 149 conn.getRegionLocator(tableName).getAllRegionLocations(); 150 asyncConn.getRegionLocator(tableName).getAllRegionLocations().get(); 151 152 Assert.assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size()); 153 154 // Merge the 3 regions into one 155 TEST_UTIL.getAdmin().mergeRegionsAsync( 156 new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() }, 157 false).get(30, TimeUnit.SECONDS); 158 159 Assert.assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 160 161 Table table = conn.getTable(tableName); 162 AsyncTable<?> asyncTable = asyncConn.getTable(tableName); 163 164 // This request should cause us to cache the newly merged region. 165 // As part of caching that region, it should clear out any cached merge parent regions which 166 // are overlapped by the new region. That way, subsequent calls below won't fall into the 167 // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached 168 // regionB and we'd continue to see cache misses below. 169 assertTrue(executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics) > 0); 170 assertTrue( 171 executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics) 172 > 0); 173 174 // We verify no new cache misses here due to above, which proves we've fixed up the cache 175 assertEquals(0, executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics)); 176 assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), 177 asyncMetrics)); 178 } 179 } 180 181 private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics) 182 throws Throwable { 183 long lastVal = metrics.getMetaCacheMisses(); 184 runnable.run(); 185 long curVal = metrics.getMetaCacheMisses(); 186 return curVal - lastVal; 187 } 188 189 /** 190 * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally 191 * encountered when using floorEntry rather than lowerEntry. 192 */ 193 @Test 194 public void testAddToCacheReverse() throws IOException, InterruptedException, ExecutionException { 195 try ( 196 AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) ConnectionFactory 197 .createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 198 ConnectionImplementation conn = (ConnectionImplementation) ConnectionFactory 199 .createConnection(TEST_UTIL.getConfiguration())) { 200 201 AsyncNonMetaRegionLocator asyncLocator = asyncConn.getLocator().getNonMetaRegionLocator(); 202 203 TableName tableName = TableName.valueOf("testAddToCache"); 204 byte[] family = Bytes.toBytes("CF"); 205 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 206 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 207 int maxSplits = 10; 208 List<byte[]> splits = 209 IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList()); 210 211 TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][])); 212 TEST_UTIL.waitTableAvailable(tableName); 213 TEST_UTIL.waitUntilNoRegionsInTransition(); 214 conn.getRegionLocator(tableName); 215 216 assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 217 218 RegionLocator locatorForTable = conn.getRegionLocator(tableName); 219 AsyncTableRegionLocator asyncLocatorForTable = asyncConn.getRegionLocator(tableName); 220 for (int i = maxSplits; i >= 0; i--) { 221 locatorForTable.getRegionLocation(Bytes.toBytes(i)); 222 asyncLocatorForTable.getRegionLocation(Bytes.toBytes(i)); 223 } 224 225 for (int i = 0; i < maxSplits; i++) { 226 assertNotNull(asyncLocator.getRegionLocationInCache(tableName, Bytes.toBytes(i))); 227 assertNotNull(conn.getCachedLocation(tableName, Bytes.toBytes(i))); 228 } 229 } 230 231 } 232 233 @Test 234 public void testPreserveMetaCacheOnException() throws Exception { 235 ((FakeRSRpcServices) badRS.getRSRpcServices()) 236 .setExceptionInjector(new RoundRobinExceptionInjector()); 237 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 238 conf.set("hbase.client.retries.number", "1"); 239 ConnectionImplementation conn = 240 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 241 try { 242 Table table = conn.getTable(TABLE_NAME); 243 byte[] row = Bytes.toBytes("row1"); 244 245 Put put = new Put(row); 246 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 247 Get get = new Get(row); 248 Append append = new Append(row); 249 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 250 Increment increment = new Increment(row); 251 increment.addColumn(FAMILY, QUALIFIER, 10); 252 Delete delete = new Delete(row); 253 delete.addColumn(FAMILY, QUALIFIER); 254 RowMutations mutations = new RowMutations(row); 255 mutations.add(put); 256 mutations.add(delete); 257 258 Exception exp; 259 boolean success; 260 for (int i = 0; i < 50; i++) { 261 exp = null; 262 success = false; 263 try { 264 table.put(put); 265 // If at least one operation succeeded, we should have cached the region location. 266 success = true; 267 table.get(get); 268 table.append(append); 269 table.increment(increment); 270 table.delete(delete); 271 table.mutateRow(mutations); 272 } catch (IOException ex) { 273 // Only keep track of the last exception that updated the meta cache 274 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 275 exp = ex; 276 } 277 } 278 // Do not test if we did not touch the meta cache in this iteration. 279 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 280 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 281 } else if (success) { 282 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 283 } 284 } 285 } finally { 286 conn.close(); 287 } 288 } 289 290 @Test 291 public void testClearsCacheOnScanException() throws Exception { 292 ((FakeRSRpcServices) badRS.getRSRpcServices()) 293 .setExceptionInjector(new RoundRobinExceptionInjector()); 294 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 295 conf.set("hbase.client.retries.number", "1"); 296 297 try ( 298 ConnectionImplementation conn = 299 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 300 Table table = conn.getTable(TABLE_NAME)) { 301 302 byte[] row = Bytes.toBytes("row2"); 303 304 Put put = new Put(row); 305 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 306 307 Scan scan = new Scan(); 308 scan.withStartRow(row); 309 scan.setLimit(1); 310 scan.setCaching(1); 311 312 populateCache(table, row); 313 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 314 assertTrue(executeUntilCacheClearingException(table, scan)); 315 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 316 317 // repopulate cache so we can test with reverse scan too 318 populateCache(table, row); 319 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 320 321 // run with reverse scan 322 scan.setReversed(true); 323 assertTrue(executeUntilCacheClearingException(table, scan)); 324 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 325 } 326 } 327 328 private void populateCache(Table table, byte[] row) { 329 for (int i = 0; i < 50; i++) { 330 try { 331 table.get(new Get(row)); 332 return; 333 } catch (Exception e) { 334 // pass, we just want this to succeed so that region location will be cached 335 } 336 } 337 } 338 339 private boolean executeUntilCacheClearingException(Table table, Scan scan) { 340 for (int i = 0; i < 50; i++) { 341 try { 342 try (ResultScanner scanner = table.getScanner(scan)) { 343 scanner.next(); 344 } 345 } catch (Exception ex) { 346 // Only keep track of the last exception that updated the meta cache 347 if (ClientExceptionsUtil.isMetaClearingException(ex)) { 348 return true; 349 } 350 } 351 } 352 return false; 353 } 354 355 @Test 356 public void testCacheClearingOnCallQueueTooBig() throws Exception { 357 ((FakeRSRpcServices) badRS.getRSRpcServices()) 358 .setExceptionInjector(new CallQueueTooBigExceptionInjector()); 359 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 360 conf.set("hbase.client.retries.number", "2"); 361 conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true"); 362 ConnectionImplementation conn = 363 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 364 try { 365 Table table = conn.getTable(TABLE_NAME); 366 byte[] row = Bytes.toBytes("row1"); 367 368 Put put = new Put(row); 369 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 370 table.put(put); 371 372 // obtain the client metrics 373 MetricsConnection metrics = conn.getConnectionMetrics(); 374 long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 375 long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 376 377 // attempt a get on the test table 378 Get get = new Get(row); 379 try { 380 table.get(get); 381 fail("Expected CallQueueTooBigException"); 382 } catch (RetriesExhaustedException ree) { 383 // expected 384 } 385 386 // verify that no cache clearing took place 387 long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 388 long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 389 assertEquals(preGetRegionClears, postGetRegionClears); 390 assertEquals(preGetServerClears, postGetServerClears); 391 } finally { 392 conn.close(); 393 } 394 } 395 396 public static List<Throwable> metaCachePreservingExceptions() { 397 return Arrays.asList(new RegionOpeningException(" "), 398 new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "), 399 new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "), 400 new CallQueueTooBigException()); 401 } 402 403 public static class RegionServerWithFakeRpcServices extends HRegionServer { 404 private FakeRSRpcServices rsRpcServices; 405 406 public RegionServerWithFakeRpcServices(Configuration conf) 407 throws IOException, InterruptedException { 408 super(conf); 409 } 410 411 @Override 412 protected RSRpcServices createRpcServices() throws IOException { 413 this.rsRpcServices = new FakeRSRpcServices(this); 414 return rsRpcServices; 415 } 416 417 public void setExceptionInjector(ExceptionInjector injector) { 418 rsRpcServices.setExceptionInjector(injector); 419 } 420 } 421 422 public static class FakeRSRpcServices extends RSRpcServices { 423 424 private ExceptionInjector exceptions; 425 426 public FakeRSRpcServices(HRegionServer rs) throws IOException { 427 super(rs); 428 exceptions = new RoundRobinExceptionInjector(); 429 } 430 431 public void setExceptionInjector(ExceptionInjector injector) { 432 this.exceptions = injector; 433 } 434 435 @Override 436 public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request) 437 throws ServiceException { 438 exceptions.throwOnGet(this, request); 439 return super.get(controller, request); 440 } 441 442 @Override 443 public ClientProtos.MutateResponse mutate(final RpcController controller, 444 final ClientProtos.MutateRequest request) throws ServiceException { 445 exceptions.throwOnMutate(this, request); 446 return super.mutate(controller, request); 447 } 448 449 @Override 450 public ClientProtos.ScanResponse scan(final RpcController controller, 451 final ClientProtos.ScanRequest request) throws ServiceException { 452 exceptions.throwOnScan(this, request); 453 return super.scan(controller, request); 454 } 455 } 456 457 public static abstract class ExceptionInjector { 458 protected boolean isTestTable(FakeRSRpcServices rpcServices, 459 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 460 try { 461 return TABLE_NAME 462 .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 463 } catch (IOException ioe) { 464 throw new ServiceException(ioe); 465 } 466 } 467 468 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 469 throws ServiceException; 470 471 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, 472 ClientProtos.MutateRequest request) throws ServiceException; 473 474 public abstract void throwOnScan(FakeRSRpcServices rpcServices, 475 ClientProtos.ScanRequest request) throws ServiceException; 476 } 477 478 /** 479 * Rotates through the possible cache clearing and non-cache clearing exceptions for requests. 480 */ 481 public static class RoundRobinExceptionInjector extends ExceptionInjector { 482 private int numReqs = -1; 483 private int expCount = -1; 484 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 485 486 @Override 487 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 488 throws ServiceException { 489 throwSomeExceptions(rpcServices, request.getRegion()); 490 } 491 492 @Override 493 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 494 throws ServiceException { 495 throwSomeExceptions(rpcServices, request.getRegion()); 496 } 497 498 @Override 499 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 500 throws ServiceException { 501 if (!request.hasScannerId()) { 502 // only handle initial scan requests 503 throwSomeExceptions(rpcServices, request.getRegion()); 504 } 505 } 506 507 /** 508 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically 509 * throw NotSevingRegionException which clears the meta cache. 510 */ 511 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 512 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 513 if (!isTestTable(rpcServices, regionSpec)) { 514 return; 515 } 516 517 numReqs++; 518 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 519 // meta cache preserving exceptions otherwise. 520 if (numReqs % 5 == 0) { 521 return; 522 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 523 throw new ServiceException(new NotServingRegionException()); 524 } 525 // Round robin between different special exceptions. 526 // This is not ideal since exception types are not tied to the operation performed here, 527 // But, we don't really care here if we throw MultiActionTooLargeException while doing 528 // single Gets. 529 expCount++; 530 Throwable t = 531 metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size()); 532 throw new ServiceException(t); 533 } 534 } 535 536 /** 537 * Throws CallQueueTooBigException for all gets. 538 */ 539 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 540 @Override 541 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 542 throws ServiceException { 543 if (isTestTable(rpcServices, request.getRegion())) { 544 throw new ServiceException(new CallQueueTooBigException()); 545 } 546 } 547 548 @Override 549 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 550 throws ServiceException { 551 } 552 553 @Override 554 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 555 throws ServiceException { 556 } 557 } 558 559 @Test 560 public void testUserRegionLockThrowsException() throws IOException, InterruptedException { 561 ((FakeRSRpcServices) badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector()); 562 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 563 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 564 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); 565 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000); 566 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 567 568 try (ConnectionImplementation conn = 569 (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { 570 ClientThread client1 = new ClientThread(conn); 571 ClientThread client2 = new ClientThread(conn); 572 client1.start(); 573 client2.start(); 574 client1.join(); 575 client2.join(); 576 // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and 577 // eventually fail since the sleep time is more than hbase client scanner timeout period. 578 // Other thread will wait to acquire userRegionLock. 579 // Have no idea which thread will be scheduled first. So need to check both threads. 580 581 // Both the threads will throw exception. One thread will throw exception since after 582 // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period 583 // is 2 seconds. 584 // Other thread will throw exception since it was not able to get hold of user region lock 585 // within meta operation timeout period. 586 assertNotNull(client1.getException()); 587 assertNotNull(client2.getException()); 588 589 assertTrue(client1.getException() instanceof LockTimeoutException 590 ^ client2.getException() instanceof LockTimeoutException); 591 592 // obtain the client metrics 593 MetricsConnection metrics = conn.getConnectionMetrics(); 594 long queueCount = metrics.getUserRegionLockQueue().getCount(); 595 assertEquals("Queue of userRegionLock should be updated twice. queueCount: " + queueCount, 2, 596 queueCount); 597 598 long timeoutCount = metrics.getUserRegionLockTimeout().getCount(); 599 assertEquals("Timeout of userRegionLock should happen once. timeoutCount: " + timeoutCount, 1, 600 timeoutCount); 601 602 long waitingTimerCount = metrics.getUserRegionLockWaitingTimer().getCount(); 603 assertEquals("userRegionLock should be grabbed successfully once. waitingTimerCount: " 604 + waitingTimerCount, 1, waitingTimerCount); 605 606 long heldTimerCount = metrics.getUserRegionLockHeldTimer().getCount(); 607 assertEquals( 608 "userRegionLock should be held successfully once. heldTimerCount: " + heldTimerCount, 1, 609 heldTimerCount); 610 double heldTime = metrics.getUserRegionLockHeldTimer().getSnapshot().getMax(); 611 assertTrue("Max held time should be greater than 2 seconds. heldTime: " + heldTime, 612 heldTime >= 2E9); 613 } 614 } 615 616 private final class ClientThread extends Thread { 617 private Exception exception; 618 private ConnectionImplementation connection; 619 620 private ClientThread(ConnectionImplementation connection) { 621 this.connection = connection; 622 } 623 624 @Override 625 public void run() { 626 byte[] currentKey = HConstants.EMPTY_START_ROW; 627 try { 628 connection.getRegionLocation(TABLE_NAME, currentKey, true); 629 } catch (IOException e) { 630 LOG.error("Thread id: " + this.getId() + " exception: ", e); 631 this.exception = e; 632 } 633 } 634 635 public Exception getException() { 636 return exception; 637 } 638 } 639 640 public static class LockSleepInjector extends ExceptionInjector { 641 @Override 642 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) { 643 try { 644 Thread.sleep(5000); 645 } catch (InterruptedException e) { 646 LOG.info("Interrupted exception", e); 647 } 648 } 649 650 @Override 651 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { 652 } 653 654 @Override 655 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { 656 } 657 } 658}