001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static junit.framework.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.TimeUnit;
031import java.util.stream.Collectors;
032import java.util.stream.IntStream;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CallQueueTooBigException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.MultiActionResultTooLarge;
041import org.apache.hadoop.hbase.NotServingRegionException;
042import org.apache.hadoop.hbase.RegionTooBusyException;
043import org.apache.hadoop.hbase.RetryImmediatelyException;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
046import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
047import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.AfterClass;
054import org.junit.Assert;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.function.ThrowingRunnable;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
069
070@Category({ MediumTests.class, ClientTests.class })
071public class TestMetaCache {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestMetaCache.class);
076
077  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
079  private static final byte[] FAMILY = Bytes.toBytes("fam1");
080  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
081  private static HRegionServer badRS;
082  private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);
083
084  /**
085   * @throws java.lang.Exception
086   */
087  @BeforeClass
088  public static void setUpBeforeClass() throws Exception {
089    Configuration conf = TEST_UTIL.getConfiguration();
090    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
091    TEST_UTIL.startMiniCluster(1);
092    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
093    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
094    badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
095    assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
096    HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
097    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
098    fam.setMaxVersions(2);
099    table.addFamily(fam);
100    TEST_UTIL.createTable(table, null);
101  }
102
103  /**
104   * @throws java.lang.Exception
105   */
106  @AfterClass
107  public static void tearDownAfterClass() throws Exception {
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  @Test
112  public void testMergeEmptyWithMetaCache() throws Throwable {
113    TableName tableName = TableName.valueOf("MergeEmpty");
114    byte[] family = Bytes.toBytes("CF");
115    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
116      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
117    TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) });
118    TEST_UTIL.waitTableAvailable(tableName);
119    TEST_UTIL.waitUntilNoRegionsInTransition();
120    RegionInfo regionA = null;
121    RegionInfo regionB = null;
122    RegionInfo regionC = null;
123    for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) {
124      if (region.getStartKey().length == 0) {
125        regionA = region;
126      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) {
127        regionB = region;
128      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) {
129        regionC = region;
130      }
131    }
132
133    assertNotNull(regionA);
134    assertNotNull(regionB);
135    assertNotNull(regionC);
136
137    TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY,
138      true);
139    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
140      AsyncConnection asyncConn =
141        ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
142      ConnectionImplementation connImpl = (ConnectionImplementation) conn;
143      AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn;
144
145      MetricsConnection metrics = connImpl.getConnectionMetrics();
146      MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get();
147
148      // warm meta cache
149      conn.getRegionLocator(tableName).getAllRegionLocations();
150      asyncConn.getRegionLocator(tableName).getAllRegionLocations().get();
151
152      Assert.assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size());
153
154      // Merge the 3 regions into one
155      TEST_UTIL.getAdmin().mergeRegionsAsync(
156        new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() },
157        false).get(30, TimeUnit.SECONDS);
158
159      Assert.assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size());
160
161      Table table = conn.getTable(tableName);
162      AsyncTable<?> asyncTable = asyncConn.getTable(tableName);
163
164      // This request should cause us to cache the newly merged region.
165      // As part of caching that region, it should clear out any cached merge parent regions which
166      // are overlapped by the new region. That way, subsequent calls below won't fall into the
167      // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached
168      // regionB and we'd continue to see cache misses below.
169      assertTrue(executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics) > 0);
170      assertTrue(
171        executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics)
172            > 0);
173
174      // We verify no new cache misses here due to above, which proves we've fixed up the cache
175      assertEquals(0, executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics));
176      assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(),
177        asyncMetrics));
178    }
179  }
180
181  private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics)
182    throws Throwable {
183    long lastVal = metrics.getMetaCacheMisses();
184    runnable.run();
185    long curVal = metrics.getMetaCacheMisses();
186    return curVal - lastVal;
187  }
188
189  /**
190   * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally
191   * encountered when using floorEntry rather than lowerEntry.
192   */
193  @Test
194  public void testAddToCacheReverse() throws IOException, InterruptedException, ExecutionException {
195    try (
196      AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) ConnectionFactory
197        .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
198      ConnectionImplementation conn = (ConnectionImplementation) ConnectionFactory
199        .createConnection(TEST_UTIL.getConfiguration())) {
200
201      AsyncNonMetaRegionLocator asyncLocator = asyncConn.getLocator().getNonMetaRegionLocator();
202
203      TableName tableName = TableName.valueOf("testAddToCache");
204      byte[] family = Bytes.toBytes("CF");
205      TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
206        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
207      int maxSplits = 10;
208      List<byte[]> splits =
209        IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList());
210
211      TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][]));
212      TEST_UTIL.waitTableAvailable(tableName);
213      TEST_UTIL.waitUntilNoRegionsInTransition();
214      conn.getRegionLocator(tableName);
215
216      assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size());
217
218      RegionLocator locatorForTable = conn.getRegionLocator(tableName);
219      AsyncTableRegionLocator asyncLocatorForTable = asyncConn.getRegionLocator(tableName);
220      for (int i = maxSplits; i >= 0; i--) {
221        locatorForTable.getRegionLocation(Bytes.toBytes(i));
222        asyncLocatorForTable.getRegionLocation(Bytes.toBytes(i));
223      }
224
225      for (int i = 0; i < maxSplits; i++) {
226        assertNotNull(asyncLocator.getRegionLocationInCache(tableName, Bytes.toBytes(i)));
227        assertNotNull(conn.getCachedLocation(tableName, Bytes.toBytes(i)));
228      }
229    }
230
231  }
232
233  @Test
234  public void testPreserveMetaCacheOnException() throws Exception {
235    ((FakeRSRpcServices) badRS.getRSRpcServices())
236      .setExceptionInjector(new RoundRobinExceptionInjector());
237    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
238    conf.set("hbase.client.retries.number", "1");
239    ConnectionImplementation conn =
240      (ConnectionImplementation) ConnectionFactory.createConnection(conf);
241    try {
242      Table table = conn.getTable(TABLE_NAME);
243      byte[] row = Bytes.toBytes("row1");
244
245      Put put = new Put(row);
246      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
247      Get get = new Get(row);
248      Append append = new Append(row);
249      append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
250      Increment increment = new Increment(row);
251      increment.addColumn(FAMILY, QUALIFIER, 10);
252      Delete delete = new Delete(row);
253      delete.addColumn(FAMILY, QUALIFIER);
254      RowMutations mutations = new RowMutations(row);
255      mutations.add(put);
256      mutations.add(delete);
257
258      Exception exp;
259      boolean success;
260      for (int i = 0; i < 50; i++) {
261        exp = null;
262        success = false;
263        try {
264          table.put(put);
265          // If at least one operation succeeded, we should have cached the region location.
266          success = true;
267          table.get(get);
268          table.append(append);
269          table.increment(increment);
270          table.delete(delete);
271          table.mutateRow(mutations);
272        } catch (IOException ex) {
273          // Only keep track of the last exception that updated the meta cache
274          if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
275            exp = ex;
276          }
277        }
278        // Do not test if we did not touch the meta cache in this iteration.
279        if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
280          assertNull(conn.getCachedLocation(TABLE_NAME, row));
281        } else if (success) {
282          assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
283        }
284      }
285    } finally {
286      conn.close();
287    }
288  }
289
290  @Test
291  public void testClearsCacheOnScanException() throws Exception {
292    ((FakeRSRpcServices) badRS.getRSRpcServices())
293      .setExceptionInjector(new RoundRobinExceptionInjector());
294    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
295    conf.set("hbase.client.retries.number", "1");
296
297    try (
298      ConnectionImplementation conn =
299        (ConnectionImplementation) ConnectionFactory.createConnection(conf);
300      Table table = conn.getTable(TABLE_NAME)) {
301
302      byte[] row = Bytes.toBytes("row2");
303
304      Put put = new Put(row);
305      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
306
307      Scan scan = new Scan();
308      scan.withStartRow(row);
309      scan.setLimit(1);
310      scan.setCaching(1);
311
312      populateCache(table, row);
313      assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
314      assertTrue(executeUntilCacheClearingException(table, scan));
315      assertNull(conn.getCachedLocation(TABLE_NAME, row));
316
317      // repopulate cache so we can test with reverse scan too
318      populateCache(table, row);
319      assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
320
321      // run with reverse scan
322      scan.setReversed(true);
323      assertTrue(executeUntilCacheClearingException(table, scan));
324      assertNull(conn.getCachedLocation(TABLE_NAME, row));
325    }
326  }
327
328  private void populateCache(Table table, byte[] row) {
329    for (int i = 0; i < 50; i++) {
330      try {
331        table.get(new Get(row));
332        return;
333      } catch (Exception e) {
334        // pass, we just want this to succeed so that region location will be cached
335      }
336    }
337  }
338
339  private boolean executeUntilCacheClearingException(Table table, Scan scan) {
340    for (int i = 0; i < 50; i++) {
341      try {
342        try (ResultScanner scanner = table.getScanner(scan)) {
343          scanner.next();
344        }
345      } catch (Exception ex) {
346        // Only keep track of the last exception that updated the meta cache
347        if (ClientExceptionsUtil.isMetaClearingException(ex)) {
348          return true;
349        }
350      }
351    }
352    return false;
353  }
354
355  @Test
356  public void testCacheClearingOnCallQueueTooBig() throws Exception {
357    ((FakeRSRpcServices) badRS.getRSRpcServices())
358      .setExceptionInjector(new CallQueueTooBigExceptionInjector());
359    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
360    conf.set("hbase.client.retries.number", "2");
361    conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
362    ConnectionImplementation conn =
363      (ConnectionImplementation) ConnectionFactory.createConnection(conf);
364    try {
365      Table table = conn.getTable(TABLE_NAME);
366      byte[] row = Bytes.toBytes("row1");
367
368      Put put = new Put(row);
369      put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
370      table.put(put);
371
372      // obtain the client metrics
373      MetricsConnection metrics = conn.getConnectionMetrics();
374      long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
375      long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
376
377      // attempt a get on the test table
378      Get get = new Get(row);
379      try {
380        table.get(get);
381        fail("Expected CallQueueTooBigException");
382      } catch (RetriesExhaustedException ree) {
383        // expected
384      }
385
386      // verify that no cache clearing took place
387      long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
388      long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
389      assertEquals(preGetRegionClears, postGetRegionClears);
390      assertEquals(preGetServerClears, postGetServerClears);
391    } finally {
392      conn.close();
393    }
394  }
395
396  public static List<Throwable> metaCachePreservingExceptions() {
397    return Arrays.asList(new RegionOpeningException(" "),
398      new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
399      new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
400      new CallQueueTooBigException());
401  }
402
403  public static class RegionServerWithFakeRpcServices extends HRegionServer {
404    private FakeRSRpcServices rsRpcServices;
405
406    public RegionServerWithFakeRpcServices(Configuration conf)
407      throws IOException, InterruptedException {
408      super(conf);
409    }
410
411    @Override
412    protected RSRpcServices createRpcServices() throws IOException {
413      this.rsRpcServices = new FakeRSRpcServices(this);
414      return rsRpcServices;
415    }
416
417    public void setExceptionInjector(ExceptionInjector injector) {
418      rsRpcServices.setExceptionInjector(injector);
419    }
420  }
421
422  public static class FakeRSRpcServices extends RSRpcServices {
423
424    private ExceptionInjector exceptions;
425
426    public FakeRSRpcServices(HRegionServer rs) throws IOException {
427      super(rs);
428      exceptions = new RoundRobinExceptionInjector();
429    }
430
431    public void setExceptionInjector(ExceptionInjector injector) {
432      this.exceptions = injector;
433    }
434
435    @Override
436    public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request)
437      throws ServiceException {
438      exceptions.throwOnGet(this, request);
439      return super.get(controller, request);
440    }
441
442    @Override
443    public ClientProtos.MutateResponse mutate(final RpcController controller,
444      final ClientProtos.MutateRequest request) throws ServiceException {
445      exceptions.throwOnMutate(this, request);
446      return super.mutate(controller, request);
447    }
448
449    @Override
450    public ClientProtos.ScanResponse scan(final RpcController controller,
451      final ClientProtos.ScanRequest request) throws ServiceException {
452      exceptions.throwOnScan(this, request);
453      return super.scan(controller, request);
454    }
455  }
456
457  public static abstract class ExceptionInjector {
458    protected boolean isTestTable(FakeRSRpcServices rpcServices,
459      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
460      try {
461        return TABLE_NAME
462          .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
463      } catch (IOException ioe) {
464        throw new ServiceException(ioe);
465      }
466    }
467
468    public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
469      throws ServiceException;
470
471    public abstract void throwOnMutate(FakeRSRpcServices rpcServices,
472      ClientProtos.MutateRequest request) throws ServiceException;
473
474    public abstract void throwOnScan(FakeRSRpcServices rpcServices,
475      ClientProtos.ScanRequest request) throws ServiceException;
476  }
477
478  /**
479   * Rotates through the possible cache clearing and non-cache clearing exceptions for requests.
480   */
481  public static class RoundRobinExceptionInjector extends ExceptionInjector {
482    private int numReqs = -1;
483    private int expCount = -1;
484    private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
485
486    @Override
487    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
488      throws ServiceException {
489      throwSomeExceptions(rpcServices, request.getRegion());
490    }
491
492    @Override
493    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
494      throws ServiceException {
495      throwSomeExceptions(rpcServices, request.getRegion());
496    }
497
498    @Override
499    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
500      throws ServiceException {
501      if (!request.hasScannerId()) {
502        // only handle initial scan requests
503        throwSomeExceptions(rpcServices, request.getRegion());
504      }
505    }
506
507    /**
508     * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically
509     * throw NotSevingRegionException which clears the meta cache.
510     */
511    private void throwSomeExceptions(FakeRSRpcServices rpcServices,
512      HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
513      if (!isTestTable(rpcServices, regionSpec)) {
514        return;
515      }
516
517      numReqs++;
518      // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
519      // meta cache preserving exceptions otherwise.
520      if (numReqs % 5 == 0) {
521        return;
522      } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
523        throw new ServiceException(new NotServingRegionException());
524      }
525      // Round robin between different special exceptions.
526      // This is not ideal since exception types are not tied to the operation performed here,
527      // But, we don't really care here if we throw MultiActionTooLargeException while doing
528      // single Gets.
529      expCount++;
530      Throwable t =
531        metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size());
532      throw new ServiceException(t);
533    }
534  }
535
536  /**
537   * Throws CallQueueTooBigException for all gets.
538   */
539  public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
540    @Override
541    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
542      throws ServiceException {
543      if (isTestTable(rpcServices, request.getRegion())) {
544        throw new ServiceException(new CallQueueTooBigException());
545      }
546    }
547
548    @Override
549    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
550      throws ServiceException {
551    }
552
553    @Override
554    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
555      throws ServiceException {
556    }
557  }
558
559  @Test
560  public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
561    ((FakeRSRpcServices) badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
562    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
563    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
564    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
565    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);
566    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
567
568    try (ConnectionImplementation conn =
569      (ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
570      ClientThread client1 = new ClientThread(conn);
571      ClientThread client2 = new ClientThread(conn);
572      client1.start();
573      client2.start();
574      client1.join();
575      client2.join();
576      // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and
577      // eventually fail since the sleep time is more than hbase client scanner timeout period.
578      // Other thread will wait to acquire userRegionLock.
579      // Have no idea which thread will be scheduled first. So need to check both threads.
580
581      // Both the threads will throw exception. One thread will throw exception since after
582      // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
583      // is 2 seconds.
584      // Other thread will throw exception since it was not able to get hold of user region lock
585      // within meta operation timeout period.
586      assertNotNull(client1.getException());
587      assertNotNull(client2.getException());
588
589      assertTrue(client1.getException() instanceof LockTimeoutException
590        ^ client2.getException() instanceof LockTimeoutException);
591
592      // obtain the client metrics
593      MetricsConnection metrics = conn.getConnectionMetrics();
594      long queueCount = metrics.getUserRegionLockQueue().getCount();
595      assertEquals("Queue of userRegionLock should be updated twice. queueCount: " + queueCount, 2,
596        queueCount);
597
598      long timeoutCount = metrics.getUserRegionLockTimeout().getCount();
599      assertEquals("Timeout of userRegionLock should happen once. timeoutCount: " + timeoutCount, 1,
600        timeoutCount);
601
602      long waitingTimerCount = metrics.getUserRegionLockWaitingTimer().getCount();
603      assertEquals("userRegionLock should be grabbed successfully once. waitingTimerCount: "
604        + waitingTimerCount, 1, waitingTimerCount);
605
606      long heldTimerCount = metrics.getUserRegionLockHeldTimer().getCount();
607      assertEquals(
608        "userRegionLock should be held successfully once. heldTimerCount: " + heldTimerCount, 1,
609        heldTimerCount);
610      double heldTime = metrics.getUserRegionLockHeldTimer().getSnapshot().getMax();
611      assertTrue("Max held time should be greater than 2 seconds. heldTime: " + heldTime,
612        heldTime >= 2E9);
613    }
614  }
615
616  private final class ClientThread extends Thread {
617    private Exception exception;
618    private ConnectionImplementation connection;
619
620    private ClientThread(ConnectionImplementation connection) {
621      this.connection = connection;
622    }
623
624    @Override
625    public void run() {
626      byte[] currentKey = HConstants.EMPTY_START_ROW;
627      try {
628        connection.getRegionLocation(TABLE_NAME, currentKey, true);
629      } catch (IOException e) {
630        LOG.error("Thread id: " + this.getId() + "  exception: ", e);
631        this.exception = e;
632      }
633    }
634
635    public Exception getException() {
636      return exception;
637    }
638  }
639
640  public static class LockSleepInjector extends ExceptionInjector {
641    @Override
642    public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
643      try {
644        Thread.sleep(5000);
645      } catch (InterruptedException e) {
646        LOG.info("Interrupted exception", e);
647      }
648    }
649
650    @Override
651    public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) {
652    }
653
654    @Override
655    public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) {
656    }
657  }
658}