001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045
046import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
047
048@Category({ MediumTests.class, ClientTests.class })
049public class TestAsyncSingleRequestRpcRetryingCaller {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
054
055  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
056
057  private static TableName TABLE_NAME = TableName.valueOf("async");
058
059  private static byte[] FAMILY = Bytes.toBytes("cf");
060
061  private static byte[] QUALIFIER = Bytes.toBytes("cq");
062
063  private static byte[] ROW = Bytes.toBytes("row");
064
065  private static byte[] VALUE = Bytes.toBytes("value");
066
067  private static AsyncConnectionImpl CONN;
068
069  @BeforeClass
070  public static void setUpBeforeClass() throws Exception {
071    TEST_UTIL.startMiniCluster(2);
072    TEST_UTIL.getAdmin().balancerSwitch(false, true);
073    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
074    TEST_UTIL.waitTableAvailable(TABLE_NAME);
075    ConnectionRegistry registry =
076      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent());
077    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
078      registry.getClusterId().get(), User.getCurrent());
079  }
080
081  @AfterClass
082  public static void tearDownAfterClass() throws Exception {
083    Closeables.close(CONN, true);
084    TEST_UTIL.shutdownMiniCluster();
085  }
086
087  @Test
088  public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
089    // This will leave a cached entry in location cache
090    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
091    int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
092    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(),
093      TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName());
094    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
095      .setMaxRetries(30).build();
096    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
097
098    // move back
099    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName());
100    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
101    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
102  }
103
104  private <T> CompletableFuture<T> failedFuture() {
105    CompletableFuture<T> future = new CompletableFuture<>();
106    future.completeExceptionally(new RuntimeException("Inject error!"));
107    return future;
108  }
109
110  @Test
111  public void testMaxRetries() throws IOException, InterruptedException {
112    try {
113      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
114        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
115        .action((controller, loc, stub) -> failedFuture()).call().get();
116      fail();
117    } catch (ExecutionException e) {
118      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
119    }
120  }
121
122  @Test
123  public void testOperationTimeout() throws IOException, InterruptedException {
124    long startNs = System.nanoTime();
125    try {
126      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
127        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
128        .action((controller, loc, stub) -> failedFuture()).call().get();
129      fail();
130    } catch (ExecutionException e) {
131      e.printStackTrace();
132      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
133    }
134    long costNs = System.nanoTime() - startNs;
135    assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
136    assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
137  }
138
139  @Test
140  public void testLocateError() throws IOException, InterruptedException, ExecutionException {
141    AtomicBoolean errorTriggered = new AtomicBoolean(false);
142    AtomicInteger count = new AtomicInteger(0);
143    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
144    AsyncRegionLocator mockedLocator =
145      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
146        @Override
147        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
148          int replicaId, RegionLocateType locateType, long timeoutNs) {
149          if (tableName.equals(TABLE_NAME)) {
150            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
151            if (count.getAndIncrement() == 0) {
152              errorTriggered.set(true);
153              future.completeExceptionally(new RuntimeException("Inject error!"));
154            } else {
155              future.complete(loc);
156            }
157            return future;
158          } else {
159            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
160          }
161        }
162
163        @Override
164        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
165        }
166      };
167    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
168      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
169
170      @Override
171      AsyncRegionLocator getLocator() {
172        return mockedLocator;
173      }
174    }) {
175      AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
176        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
177      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
178      assertTrue(errorTriggered.get());
179      errorTriggered.set(false);
180      count.set(0);
181      Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
182      assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
183      assertTrue(errorTriggered.get());
184    }
185  }
186}