001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Modifier;
030import java.net.SocketTimeoutException;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Set;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.SynchronousQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.stream.Collectors;
043import java.util.stream.IntStream;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.CallDroppedException;
046import org.apache.hadoop.hbase.CallQueueTooBigException;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseServerException;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.RegionLocations;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.Waiter;
057import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.exceptions.RegionMovedException;
060import org.apache.hadoop.hbase.filter.Filter;
061import org.apache.hadoop.hbase.filter.FilterBase;
062import org.apache.hadoop.hbase.ipc.RpcClient;
063import org.apache.hadoop.hbase.master.HMaster;
064import org.apache.hadoop.hbase.regionserver.HRegion;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.Region;
067import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.JVMClusterUtil;
072import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
073import org.apache.hadoop.hbase.util.ReflectionUtils;
074import org.apache.hadoop.hbase.util.Threads;
075import org.junit.After;
076import org.junit.AfterClass;
077import org.junit.Assert;
078import org.junit.BeforeClass;
079import org.junit.ClassRule;
080import org.junit.Ignore;
081import org.junit.Rule;
082import org.junit.Test;
083import org.junit.experimental.categories.Category;
084import org.junit.rules.TestName;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
089import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
090import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
091import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
092
093/**
094 * This class is for testing HBaseConnectionManager features
095 */
096@Category({ LargeTests.class })
097public class TestConnectionImplementation {
098
099  @ClassRule
100  public static final HBaseClassTestRule CLASS_RULE =
101    HBaseClassTestRule.forClass(TestConnectionImplementation.class);
102
103  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
104  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
105  private static final TableName TABLE_NAME = TableName.valueOf("test");
106  private static final TableName TABLE_NAME1 = TableName.valueOf("test1");
107  private static final TableName TABLE_NAME2 = TableName.valueOf("test2");
108  private static final TableName TABLE_NAME3 = TableName.valueOf("test3");
109  private static final byte[] FAM_NAM = Bytes.toBytes("f");
110  private static final byte[] ROW = Bytes.toBytes("bbb");
111  private static final byte[] ROW_X = Bytes.toBytes("xxx");
112  private static final int RPC_RETRY = 5;
113
114  @Rule
115  public TestName name = new TestName();
116
117  @BeforeClass
118  public static void setUpBeforeClass() throws Exception {
119    ResourceLeakDetector.setLevel(Level.PARANOID);
120    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
121    // Up the handlers; this test needs more than usual.
122    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
123    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
124    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
125    TEST_UTIL.startMiniCluster(2);
126
127  }
128
129  @AfterClass
130  public static void tearDownAfterClass() throws Exception {
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  @After
135  public void tearDown() throws IOException {
136    TEST_UTIL.getAdmin().balancerSwitch(true, true);
137  }
138
139  @Test
140  public void testClusterConnection() throws IOException {
141    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS,
142      new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d")
143        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
144
145    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
146    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
147    // make sure the internally created ExecutorService is the one passed
148    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
149
150    final TableName tableName = TableName.valueOf(name.getMethodName());
151    TEST_UTIL.createTable(tableName, FAM_NAM).close();
152    Table table = con1.getTable(tableName, otherPool);
153
154    ExecutorService pool = null;
155
156    if (table instanceof HTable) {
157      HTable t = (HTable) table;
158      // make sure passing a pool to the getTable does not trigger creation of an internal pool
159      assertNull("Internal Thread pool should be null",
160        ((ConnectionImplementation) con1).getCurrentBatchPool());
161      // table should use the pool passed
162      assertTrue(otherPool == t.getPool());
163      t.close();
164
165      t = (HTable) con2.getTable(tableName);
166      // table should use the connectin's internal pool
167      assertTrue(otherPool == t.getPool());
168      t.close();
169
170      t = (HTable) con2.getTable(tableName);
171      // try other API too
172      assertTrue(otherPool == t.getPool());
173      t.close();
174
175      t = (HTable) con2.getTable(tableName);
176      // try other API too
177      assertTrue(otherPool == t.getPool());
178      t.close();
179
180      t = (HTable) con1.getTable(tableName);
181      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
182      // make sure an internal pool was created
183      assertNotNull("An internal Thread pool should have been created", pool);
184      // and that the table is using it
185      assertTrue(t.getPool() == pool);
186      t.close();
187
188      t = (HTable) con1.getTable(tableName);
189      // still using the *same* internal pool
190      assertTrue(t.getPool() == pool);
191      t.close();
192    } else {
193      table.close();
194    }
195
196    con1.close();
197
198    // if the pool was created on demand it should be closed upon connection close
199    if (pool != null) {
200      assertTrue(pool.isShutdown());
201    }
202
203    con2.close();
204    // if the pool is passed, it is not closed
205    assertFalse(otherPool.isShutdown());
206    otherPool.shutdownNow();
207  }
208
209  /**
210   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
211   * @throws IOException Unable to construct admin
212   */
213  @Test
214  public void testAdminFactory() throws IOException {
215    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
216    Admin admin = con1.getAdmin();
217    assertTrue(admin.getConnection() == con1);
218    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
219    con1.close();
220  }
221
222  // Fails too often! Needs work. HBASE-12558
223  // May only fail on non-linux machines? E.g. macosx.
224  @Ignore
225  @Test(expected = RegionServerStoppedException.class)
226  // Depends on mulitcast messaging facility that seems broken in hbase2
227  // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
228  // dead servers is broke"
229  public void testClusterStatus() throws Exception {
230    final TableName tableName = TableName.valueOf(name.getMethodName());
231    byte[] cf = "cf".getBytes();
232    byte[] rk = "rk1".getBytes();
233
234    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
235    rs.waitForServerOnline();
236    final ServerName sn = rs.getRegionServer().getServerName();
237
238    Table t = TEST_UTIL.createTable(tableName, cf);
239    TEST_UTIL.waitTableAvailable(tableName);
240    TEST_UTIL.waitUntilNoRegionsInTransition();
241
242    final ConnectionImplementation hci = (ConnectionImplementation) TEST_UTIL.getConnection();
243    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
244      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
245        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(),
246          sn);
247        TEST_UTIL.waitUntilNoRegionsInTransition();
248        hci.clearRegionCache(tableName);
249      }
250      Assert.assertNotNull(hci.clusterStatusListener);
251      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
252    }
253
254    Put p1 = new Put(rk);
255    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
256    t.put(p1);
257
258    rs.getRegionServer().abort("I'm dead");
259
260    // We want the status to be updated. That's a least 10 second
261    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
262      @Override
263      public boolean evaluate() throws Exception {
264        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().getDeadServers()
265          .isDeadServer(sn);
266      }
267    });
268
269    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
270      @Override
271      public boolean evaluate() throws Exception {
272        return hci.clusterStatusListener.isDeadServer(sn);
273      }
274    });
275
276    t.close();
277    hci.getClient(sn); // will throw an exception: RegionServerStoppedException
278  }
279
280  /**
281   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
282   */
283  @Test
284  public void testConnectionCloseAllowsInterrupt() throws Exception {
285    testConnectionClose(true);
286  }
287
288  @Test
289  public void testConnectionNotAllowsInterrupt() throws Exception {
290    testConnectionClose(false);
291  }
292
293  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
294    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
295    TEST_UTIL.createTable(tableName, FAM_NAM).close();
296
297    TEST_UTIL.getAdmin().balancerSwitch(false, true);
298
299    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
300    // We want to work on a separate connection.
301    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
302    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
303    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
304    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
305    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
306    // to avoid the client to be stuck when do the Get
307    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
308    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
309    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
310
311    Connection connection = ConnectionFactory.createConnection(c2);
312    final Table table = connection.getTable(tableName);
313
314    Put put = new Put(ROW);
315    put.addColumn(FAM_NAM, ROW, ROW);
316    table.put(put);
317
318    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
319    final AtomicInteger step = new AtomicInteger(0);
320
321    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
322    Thread t = new Thread("testConnectionCloseThread") {
323      @Override
324      public void run() {
325        int done = 0;
326        try {
327          step.set(1);
328          while (step.get() == 1) {
329            Get get = new Get(ROW);
330            table.get(get);
331            done++;
332            if (done % 100 == 0) LOG.info("done=" + done);
333            // without the sleep, will cause the exception for too many files in
334            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
335            Thread.sleep(100);
336          }
337        } catch (Throwable t) {
338          failed.set(t);
339          LOG.error(t.toString(), t);
340        }
341        step.set(3);
342      }
343    };
344    t.start();
345    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
346      @Override
347      public boolean evaluate() throws Exception {
348        return step.get() == 1;
349      }
350    });
351
352    ServerName sn;
353    try (RegionLocator rl = connection.getRegionLocator(tableName)) {
354      sn = rl.getRegionLocation(ROW).getServerName();
355    }
356    ConnectionImplementation conn = (ConnectionImplementation) connection;
357    RpcClient rpcClient = conn.getRpcClient();
358
359    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
360    for (int i = 0; i < 500; i++) {
361      rpcClient.cancelConnections(sn);
362      Thread.sleep(50);
363    }
364
365    step.compareAndSet(1, 2);
366    // The test may fail here if the thread doing the gets is stuck. The way to find
367    // out what's happening is to look for the thread named 'testConnectionCloseThread'
368    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
369      @Override
370      public boolean evaluate() throws Exception {
371        return step.get() == 3;
372      }
373    });
374    table.close();
375    connection.close();
376    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
377  }
378
379  /**
380   * Test that connection can become idle without breaking everything.
381   */
382  @Test
383  public void testConnectionIdle() throws Exception {
384    final TableName tableName = TableName.valueOf(name.getMethodName());
385    TEST_UTIL.createTable(tableName, FAM_NAM).close();
386    int idleTime = 20000;
387    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
388
389    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
390    // We want to work on a separate connection.
391    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
392    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
393    c2.setInt(RpcClient.IDLE_TIME, idleTime);
394
395    Connection connection = ConnectionFactory.createConnection(c2);
396    final Table table = connection.getTable(tableName);
397
398    Put put = new Put(ROW);
399    put.addColumn(FAM_NAM, ROW, ROW);
400    table.put(put);
401
402    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
403    mee.setValue(EnvironmentEdgeManager.currentTime());
404    EnvironmentEdgeManager.injectEdge(mee);
405    LOG.info("first get");
406    table.get(new Get(ROW));
407
408    LOG.info("first get - changing the time & sleeping");
409    mee.incValue(idleTime + 1000);
410    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
411                        // 1500 = sleep time in RpcClient#waitForWork + a margin
412
413    LOG.info("second get - connection has been marked idle in the middle");
414    // To check that the connection actually became idle would need to read some private
415    // fields of RpcClient.
416    table.get(new Get(ROW));
417    mee.incValue(idleTime + 1000);
418
419    LOG.info("third get - connection is idle, but the reader doesn't know yet");
420    // We're testing here a special case:
421    // time limit reached BUT connection not yet reclaimed AND a new call.
422    // in this situation, we don't close the connection, instead we use it immediately.
423    // If we're very unlucky we can have a race condition in the test: the connection is already
424    // under closing when we do the get, so we have an exception, and we don't retry as the
425    // retry number is 1. The probability is very very low, and seems acceptable for now. It's
426    // a test issue only.
427    table.get(new Get(ROW));
428
429    LOG.info("we're done - time will change back");
430
431    table.close();
432
433    connection.close();
434    EnvironmentEdgeManager.reset();
435    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
436  }
437
438  /**
439   * Test that the connection to the dead server is cut immediately when we receive the
440   * notification.
441   */
442  @Test
443  public void testConnectionCut() throws Exception {
444    final TableName tableName = TableName.valueOf(name.getMethodName());
445
446    TEST_UTIL.createTable(tableName, FAM_NAM).close();
447    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
448
449    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
450    // We want to work on a separate connection.
451    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
452    // try only once w/o any retry
453    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
454    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
455
456    final Connection connection = ConnectionFactory.createConnection(c2);
457    final Table table = connection.getTable(tableName);
458
459    Put p = new Put(FAM_NAM);
460    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
461    table.put(p);
462
463    final ConnectionImplementation hci = (ConnectionImplementation) connection;
464
465    final HRegionLocation loc;
466    try (RegionLocator rl = connection.getRegionLocator(tableName)) {
467      loc = rl.getRegionLocation(FAM_NAM);
468    }
469
470    Get get = new Get(FAM_NAM);
471    Assert.assertNotNull(table.get(get));
472
473    get = new Get(FAM_NAM);
474    get.setFilter(new BlockingFilter());
475
476    // This thread will mark the server as dead while we're waiting during a get.
477    Thread t = new Thread() {
478      @Override
479      public void run() {
480        synchronized (syncBlockingFilter) {
481          try {
482            syncBlockingFilter.wait();
483          } catch (InterruptedException e) {
484            throw new RuntimeException(e);
485          }
486        }
487        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
488      }
489    };
490
491    t.start();
492    try {
493      table.get(get);
494      Assert.fail();
495    } catch (IOException expected) {
496      LOG.debug("Received: " + expected);
497      Assert.assertFalse(expected instanceof SocketTimeoutException);
498      Assert.assertFalse(syncBlockingFilter.get());
499    } finally {
500      syncBlockingFilter.set(true);
501      t.join();
502      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
503    }
504
505    table.close();
506    connection.close();
507  }
508
509  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
510
511  public static class BlockingFilter extends FilterBase {
512    @Override
513    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
514      int i = 0;
515      while (i++ < 1000 && !syncBlockingFilter.get()) {
516        synchronized (syncBlockingFilter) {
517          syncBlockingFilter.notifyAll();
518        }
519        Threads.sleep(100);
520      }
521      syncBlockingFilter.set(true);
522      return false;
523    }
524
525    @Override
526    public ReturnCode filterCell(final Cell ignored) throws IOException {
527      return ReturnCode.INCLUDE;
528    }
529
530    public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException {
531      return new BlockingFilter();
532    }
533  }
534
535  /**
536   * Test that when we delete a location using the first row of a region that we really delete it.
537   */
538  @Test
539  public void testRegionCaching() throws Exception {
540    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
541    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
542    // test with no retry, or client cache will get updated after the first failure
543    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
544    Connection connection = ConnectionFactory.createConnection(conf);
545    final Table table = connection.getTable(TABLE_NAME);
546
547    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
548    Put put = new Put(ROW);
549    put.addColumn(FAM_NAM, ROW, ROW);
550    table.put(put);
551
552    ConnectionImplementation conn = (ConnectionImplementation) connection;
553
554    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
555
556    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
557    // a location where the port is current port number +1 -- i.e. a non-existent location.
558    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
559    final int nextPort = loc.getPort() + 1;
560    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
561      ServerName.valueOf("127.0.0.1", nextPort, HConstants.LATEST_TIMESTAMP),
562      HConstants.LATEST_TIMESTAMP);
563    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort(),
564      nextPort);
565
566    conn.clearRegionCache(TABLE_NAME, ROW.clone());
567    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
568    assertNull("What is this location?? " + rl, rl);
569
570    // We're now going to move the region and check that it works for the client
571    // First a new put to add the location in the cache
572    conn.clearRegionCache(TABLE_NAME);
573    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
574    Put put2 = new Put(ROW);
575    put2.addColumn(FAM_NAM, ROW, ROW);
576    table.put(put2);
577    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
578    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
579
580    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
581    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
582
583    // We can wait for all regions to be online, that makes log reading easier when debugging
584    TEST_UTIL.waitUntilNoRegionsInTransition();
585
586    // Now moving the region to the second server
587    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
588    byte[] regionName = toMove.getRegionInfo().getRegionName();
589    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
590
591    // Choose the other server.
592    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
593    int destServerId = curServerId == 0 ? 1 : 0;
594
595    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
596    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
597
598    ServerName destServerName = destServer.getServerName();
599
600    // Check that we are in the expected state
601    Assert.assertTrue(curServer != destServer);
602    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
603    Assert.assertFalse(toMove.getPort() == destServerName.getPort());
604    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
605    Assert.assertNull(destServer.getOnlineRegion(regionName));
606    Assert.assertFalse(
607      TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition());
608
609    // Moving. It's possible that we don't have all the regions online at this point, so
610    // the test must depend only on the region we're looking at.
611    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
612    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
613
614    while (
615      destServer.getOnlineRegion(regionName) == null
616        || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
617        || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
618        || master.getAssignmentManager().hasRegionsInTransition()
619    ) {
620      // wait for the move to be finished
621      Thread.sleep(1);
622    }
623
624    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
625
626    // Check our new state.
627    Assert.assertNull(curServer.getOnlineRegion(regionName));
628    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
629    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
630    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
631
632    // Cache was NOT updated and points to the wrong server
633    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()
634        == destServerName.getPort());
635
636    // This part relies on a number of tries equals to 1.
637    // We do a put and expect the cache to be updated, even if we don't retry
638    LOG.info("Put starting");
639    Put put3 = new Put(ROW);
640    put3.addColumn(FAM_NAM, ROW, ROW);
641    try {
642      table.put(put3);
643      Assert.fail("Unreachable point");
644    } catch (RetriesExhaustedWithDetailsException e) {
645      LOG.info("Put done, exception caught: " + e.getClass());
646      Assert.assertEquals(1, e.getNumExceptions());
647      Assert.assertEquals(1, e.getCauses().size());
648      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
649
650      // Check that we unserialized the exception as expected
651      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
652      Assert.assertNotNull(cause);
653      Assert.assertTrue(cause instanceof RegionMovedException);
654    } catch (RetriesExhaustedException ree) {
655      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
656      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
657      LOG.info("Put done, exception caught: " + ree.getClass());
658      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
659      Assert.assertNotNull(cause);
660      Assert.assertTrue(cause instanceof RegionMovedException);
661    }
662    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
663    Assert.assertEquals("Previous server was " + curServer.getServerName().getAddress(),
664      destServerName.getPort(),
665      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
666
667    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
668    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
669
670    // We move it back to do another test with a scan
671    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
672    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(),
673      curServer.getServerName());
674
675    while (
676      curServer.getOnlineRegion(regionName) == null
677        || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
678        || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
679        || master.getAssignmentManager().hasRegionsInTransition()
680    ) {
681      // wait for the move to be finished
682      Thread.sleep(1);
683    }
684
685    // Check our new state.
686    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
687    Assert.assertNull(destServer.getOnlineRegion(regionName));
688    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
689
690    // Cache was NOT updated and points to the wrong server
691    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()
692        == curServer.getServerName().getPort());
693
694    Scan sc = new Scan();
695    sc.setStopRow(ROW);
696    sc.setStartRow(ROW);
697
698    // The scanner takes the max retries from the connection configuration, not the table as
699    // the put.
700    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
701
702    try {
703      ResultScanner rs = table.getScanner(sc);
704      while (rs.next() != null) {
705      }
706      Assert.fail("Unreachable point");
707    } catch (RetriesExhaustedException e) {
708      LOG.info("Scan done, expected exception caught: " + e.getClass());
709    }
710
711    // Cache is updated with the right value.
712    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
713    Assert.assertEquals("Previous server was " + destServer.getServerName().getAddress(),
714      curServer.getServerName().getPort(),
715      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
716
717    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
718    table.close();
719    connection.close();
720  }
721
722  /**
723   * Test that Connection or Pool are not closed when managed externally
724   */
725  @Test
726  public void testConnectionManagement() throws Exception {
727    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
728    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
729    Table table = conn.getTable(TABLE_NAME1);
730    table.close();
731    assertFalse(conn.isClosed());
732    if (table instanceof HTable) {
733      assertFalse(((HTable) table).getPool().isShutdown());
734    }
735    table = conn.getTable(TABLE_NAME1);
736    table.close();
737    if (table instanceof HTable) {
738      assertFalse(((HTable) table).getPool().isShutdown());
739    }
740    conn.close();
741    if (table instanceof HTable) {
742      assertTrue(((HTable) table).getPool().isShutdown());
743    }
744    table0.close();
745  }
746
747  /**
748   * Test that stale cache updates don't override newer cached values.
749   */
750  @Test
751  public void testCacheSeqNums() throws Exception {
752    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
753    Put put = new Put(ROW);
754    put.addColumn(FAM_NAM, ROW, ROW);
755    table.put(put);
756    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
757
758    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
759    assertNotNull(location);
760
761    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
762
763    // Same server as already in cache reporting - overwrites any value despite seqNum.
764    int nextPort = location.getPort() + 1;
765    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
766      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
767    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
768    Assert.assertEquals(nextPort, location.getPort());
769
770    // No source specified - same.
771    nextPort = location.getPort() + 1;
772    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
773      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
774    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
775    Assert.assertEquals(nextPort, location.getPort());
776
777    // Higher seqNum - overwrites lower seqNum.
778    nextPort = location.getPort() + 1;
779    conn.updateCachedLocation(location.getRegionInfo(), anySource,
780      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
781    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
782    Assert.assertEquals(nextPort, location.getPort());
783
784    // Lower seqNum - does not overwrite higher seqNum.
785    nextPort = location.getPort() + 1;
786    conn.updateCachedLocation(location.getRegionInfo(), anySource,
787      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
788    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
789    Assert.assertEquals(nextPort - 1, location.getPort());
790    table.close();
791  }
792
793  @Test
794  public void testClosing() throws Exception {
795    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
796    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
797      String.valueOf(ThreadLocalRandom.current().nextInt()));
798
799    // as connection caching is going away, now we're just testing
800    // that closed connection does actually get closed.
801
802    Connection c1 = ConnectionFactory.createConnection(configuration);
803    Connection c2 = ConnectionFactory.createConnection(configuration);
804    // no caching, different connections
805    assertTrue(c1 != c2);
806
807    // closing independently
808    c1.close();
809    assertTrue(c1.isClosed());
810    assertFalse(c2.isClosed());
811
812    c2.close();
813    assertTrue(c2.isClosed());
814  }
815
816  /**
817   * Trivial test to verify that nobody messes with
818   * {@link ConnectionFactory#createConnection(Configuration)}
819   */
820  @Test
821  public void testCreateConnection() throws Exception {
822    Configuration configuration = TEST_UTIL.getConfiguration();
823    Connection c1 = ConnectionFactory.createConnection(configuration);
824    Connection c2 = ConnectionFactory.createConnection(configuration);
825    // created from the same configuration, yet they are different
826    assertTrue(c1 != c2);
827    assertTrue(c1.getConfiguration() == c2.getConfiguration());
828  }
829
830  /**
831   * This test checks that one can connect to the cluster with only the ZooKeeper quorum set. Other
832   * stuff like master address will be read from ZK by the client.
833   */
834  @Test
835  public void testConnection() throws Exception {
836    // We create an empty config and add the ZK address.
837    Configuration c = new Configuration();
838    // This test only makes sense for ZK based connection registry.
839    c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
840      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
841    c.set(HConstants.ZOOKEEPER_QUORUM,
842      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
843    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
844      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
845    // This should be enough to connect
846    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
847    assertTrue(conn.isMasterRunning());
848    conn.close();
849  }
850
851  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
852    Field numTries = hci.getClass().getDeclaredField("numTries");
853    numTries.setAccessible(true);
854    Field modifiersField = ReflectionUtils.getModifiersField();
855    modifiersField.setAccessible(true);
856    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
857    final int prevNumRetriesVal = (Integer) numTries.get(hci);
858    numTries.set(hci, newVal);
859
860    return prevNumRetriesVal;
861  }
862
863  @Test
864  public void testMulti() throws Exception {
865    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
866    try {
867      ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
868
869      // We're now going to move the region and check that it works for the client
870      // First a new put to add the location in the cache
871      conn.clearRegionCache(TABLE_NAME3);
872      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
873
874      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
875      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
876
877      // We can wait for all regions to be online, that makes log reading easier when debugging
878      TEST_UTIL.waitUntilNoRegionsInTransition();
879
880      Put put = new Put(ROW_X);
881      put.addColumn(FAM_NAM, ROW_X, ROW_X);
882      table.put(put);
883
884      // Now moving the region to the second server
885      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
886      byte[] regionName = toMove.getRegionInfo().getRegionName();
887      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
888
889      // Choose the other server.
890      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
891      int destServerId = (curServerId == 0 ? 1 : 0);
892
893      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
894      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
895
896      ServerName destServerName = destServer.getServerName();
897      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
898
899      // find another row in the cur server that is less than ROW_X
900      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
901      byte[] otherRow = null;
902      for (Region region : regions) {
903        if (
904          !region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
905            && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0
906        ) {
907          otherRow = region.getRegionInfo().getStartKey();
908          break;
909        }
910      }
911      assertNotNull(otherRow);
912      // If empty row, set it to first row.-f
913      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
914      Put put2 = new Put(otherRow);
915      put2.addColumn(FAM_NAM, otherRow, otherRow);
916      table.put(put2); // cache put2's location
917
918      // Check that we are in the expected state
919      Assert.assertTrue(curServer != destServer);
920      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
921      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
922      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
923      Assert.assertNull(destServer.getOnlineRegion(regionName));
924      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager()
925        .hasRegionsInTransition());
926
927      // Moving. It's possible that we don't have all the regions online at this point, so
928      // the test depends only on the region we're looking at.
929      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
930      TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
931
932      while (
933        destServer.getOnlineRegion(regionName) == null
934          || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
935          || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
936          || master.getAssignmentManager().hasRegionsInTransition()
937      ) {
938        // wait for the move to be finished
939        Thread.sleep(1);
940      }
941
942      LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
943
944      // Check our new state.
945      Assert.assertNull(curServer.getOnlineRegion(regionName));
946      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
947      Assert
948        .assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
949      Assert
950        .assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
951
952      // Cache was NOT updated and points to the wrong server
953      Assert.assertFalse(conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation().getPort()
954          == destServerName.getPort());
955
956      // Hijack the number of retry to fail after 2 tries
957      final int prevNumRetriesVal = setNumTries(conn, 2);
958
959      Put put3 = new Put(ROW_X);
960      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
961      Put put4 = new Put(otherRow);
962      put4.addColumn(FAM_NAM, otherRow, otherRow);
963
964      // do multi
965      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
966      table.batch(actions, null); // first should be a valid row,
967      // second we get RegionMovedException.
968
969      setNumTries(conn, prevNumRetriesVal);
970    } finally {
971      table.close();
972    }
973  }
974
975  @Test
976  public void testErrorBackoffTimeCalculation() throws Exception {
977    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
978    final long ANY_PAUSE = 100;
979    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
980    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
981
982    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
983    EnvironmentEdgeManager.injectEdge(timeMachine);
984    try {
985      long largeAmountOfTime = ANY_PAUSE * 1000;
986      ConnectionImplementation.ServerErrorTracker tracker =
987        new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
988
989      // The default backoff is 0.
990      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
991
992      // Check some backoff values from HConstants sequence.
993      tracker.reportServerError(location);
994      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
995        tracker.calculateBackoffTime(location, ANY_PAUSE));
996      tracker.reportServerError(location);
997      tracker.reportServerError(location);
998      tracker.reportServerError(location);
999      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1000        tracker.calculateBackoffTime(location, ANY_PAUSE));
1001
1002      // All of this shouldn't affect backoff for different location.
1003      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1004      tracker.reportServerError(diffLocation);
1005      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1006        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1007
1008      // Check with different base.
1009      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1010        tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1011    } finally {
1012      EnvironmentEdgeManager.reset();
1013    }
1014  }
1015
1016  private static void assertEqualsWithJitter(long expected, long actual) {
1017    assertEqualsWithJitter(expected, actual, expected);
1018  }
1019
1020  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1021    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1022      Math.abs(actual - expected) <= (0.01f * jitterBase));
1023  }
1024
1025  @Test
1026  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1027    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1028    // This test only makes sense for ZK based connection registry.
1029    config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
1030      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
1031
1032    final TableName tableName = TableName.valueOf(name.getMethodName());
1033    TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
1034
1035    Connection connection = ConnectionFactory.createConnection(config);
1036    Table table = connection.getTable(tableName);
1037
1038    // this will cache the meta location and table's region location
1039    table.get(new Get(Bytes.toBytes("foo")));
1040
1041    // restart HBase
1042    TEST_UTIL.shutdownMiniHBaseCluster();
1043    TEST_UTIL.restartHBaseCluster(2);
1044    // this should be able to discover new locations for meta and table's region
1045    table.get(new Get(Bytes.toBytes("foo")));
1046    TEST_UTIL.deleteTable(tableName);
1047    table.close();
1048    connection.close();
1049  }
1050
1051  @Test
1052  public void testLocateRegionsWithRegionReplicas() throws IOException {
1053    int regionReplication = 3;
1054    byte[] family = Bytes.toBytes("cf");
1055    TableName tableName = TableName.valueOf(name.getMethodName());
1056
1057    // Create a table with region replicas
1058    TableDescriptorBuilder builder =
1059      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
1060        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1061    TEST_UTIL.getAdmin().createTable(builder.build());
1062
1063    try (ConnectionImplementation con =
1064      (ConnectionImplementation) ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
1065      // Get locations of the regions of the table
1066      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1067
1068      // The size of the returned locations should be 3
1069      assertEquals(regionReplication, locations.size());
1070
1071      // The replicaIds of the returned locations should be 0, 1 and 2
1072      Set<Integer> expectedReplicaIds =
1073        IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
1074      for (HRegionLocation location : locations) {
1075        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1076      }
1077    } finally {
1078      TEST_UTIL.deleteTable(tableName);
1079    }
1080  }
1081
1082  @Test
1083  public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException {
1084    testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class);
1085  }
1086
1087  @Test
1088  public void testLocateRegionsRetrySpecialPauseCDE() throws IOException {
1089    testLocateRegionsRetrySpecialPause(CallDroppedException.class);
1090  }
1091
1092  private void testLocateRegionsRetrySpecialPause(
1093    Class<? extends HBaseServerException> exceptionClass) throws IOException {
1094
1095    int regionReplication = 3;
1096    byte[] family = Bytes.toBytes("cf");
1097    TableName tableName = TableName.valueOf(name.getMethodName());
1098
1099    // Create a table with region replicas
1100    TableDescriptorBuilder builder =
1101      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
1102        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1103    TEST_UTIL.getAdmin().createTable(builder.build());
1104
1105    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
1106
1107    conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, ThrowingCallerFactory.class,
1108      RpcRetryingCallerFactory.class);
1109    conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class);
1110
1111    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
1112    // normal pause very short, 10 millis
1113    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10);
1114
1115    // special pause 10x longer, so we can detect it
1116    long specialPauseMillis = 1000;
1117    conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
1118      specialPauseMillis);
1119
1120    try (ConnectionImplementation con =
1121      (ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
1122      // Get locations of the regions of the table
1123
1124      long start = System.nanoTime();
1125      try {
1126        con.locateRegion(tableName, new byte[0], false, true, 0);
1127      } catch (HBaseServerException e) {
1128        assertTrue(e.isServerOverloaded());
1129        // pass: expected
1130      }
1131      assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis));
1132    } finally {
1133      TEST_UTIL.deleteTable(tableName);
1134    }
1135  }
1136
1137  private static class ThrowingCallerFactory extends RpcRetryingCallerFactory {
1138
1139    private final Class<? extends HBaseServerException> exceptionClass;
1140
1141    public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) {
1142      super(conf, connectionConfig);
1143      this.exceptionClass =
1144        conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
1145    }
1146
1147    @Override
1148    public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
1149      return newCaller();
1150    }
1151
1152    @Override
1153    public <T> RpcRetryingCaller<T> newCaller() {
1154      return new RpcRetryingCaller<T>() {
1155        @Override
1156        public void cancel() {
1157
1158        }
1159
1160        @Override
1161        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
1162          throws IOException, RuntimeException {
1163          return callWithoutRetries(null, 0);
1164        }
1165
1166        @Override
1167        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
1168          throws IOException, RuntimeException {
1169          try {
1170            throw exceptionClass.getConstructor().newInstance();
1171          } catch (IllegalAccessException | InstantiationException | InvocationTargetException
1172            | NoSuchMethodException e) {
1173            throw new RuntimeException(e);
1174          }
1175        }
1176      };
1177    }
1178  }
1179
1180  @Test
1181  public void testMetaLookupThreadPoolCreated() throws Exception {
1182    final TableName tableName = TableName.valueOf(name.getMethodName());
1183    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1184    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1185      TEST_UTIL.getAdmin().disableTable(tableName);
1186      TEST_UTIL.getAdmin().deleteTable(tableName);
1187    }
1188    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1189      byte[] row = Bytes.toBytes("test");
1190      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1191      // check that metalookup pool would get created
1192      c.relocateRegion(tableName, row);
1193      ExecutorService ex = c.getCurrentMetaLookupPool();
1194      assertNotNull(ex);
1195    }
1196  }
1197
1198  // There is no assertion, but you need to confirm that there is no resource leak output from netty
1199  @Test
1200  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
1201    TableName tableName = TableName.valueOf(name.getMethodName());
1202    TEST_UTIL.createTable(tableName, FAM_NAM).close();
1203    TEST_UTIL.getAdmin().balancerSwitch(false, true);
1204    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
1205      Table table = connection.getTable(tableName)) {
1206      table.get(new Get(Bytes.toBytes("1")));
1207      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
1208      RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
1209      rpcClient.cancelConnections(sn);
1210      Thread.sleep(1000);
1211      System.gc();
1212      Thread.sleep(1000);
1213    }
1214  }
1215}