001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.mockito.Mockito;
046
047/**
048 * The purpose of this test is to make sure the region exception won't corrupt the results of batch.
049 * The prescription is shown below. 1) honor the action result rather than region exception. If the
050 * action have both of true result and region exception, the action is fine as the exception is
051 * caused by other actions which are in the same region. 2) honor the action exception rather than
052 * region exception. If the action have both of action exception and region exception, we deal with
053 * the action exception only. If we also handle the region exception for the same action, it will
054 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
055 * block forever. This bug can be reproduced by real use case. see TestMalformedCellFromClient(in
056 * branch-1.4+). It uses the batch of RowMutations to present the bug. Given that the batch of
057 * RowMutations is only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't
058 * encounter this issue. We still backport the fix to branch-1.3 and branch-1.2 in case we ignore
059 * some write paths.
060 */
061@Category({ ClientTests.class, SmallTests.class })
062public class TestAsyncProcessWithRegionException {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
067
068  private static final Result EMPTY_RESULT = Result.create(null, true);
069  private static final IOException IOE = new IOException("YOU CAN'T PASS");
070  private static final Configuration CONF = new Configuration();
071  private static final ConnectionConfiguration CONNECTION_CONFIG =
072    new ConnectionConfiguration(CONF);
073  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
074  private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
075  private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
076  private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
077    Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
078  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
079  private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
080  private static final RegionInfo REGION_INFO =
081    RegionInfoBuilder.newBuilder(DUMMY_TABLE).setStartKey(HConstants.EMPTY_START_ROW)
082      .setEndKey(HConstants.EMPTY_END_ROW).setSplit(false).setRegionId(1).build();
083
084  private static final HRegionLocation REGION_LOCATION =
085    new HRegionLocation(REGION_INFO, SERVER_NAME);
086
087  @BeforeClass
088  public static void setUpBeforeClass() {
089    // disable the retry
090    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
091  }
092
093  @Test
094  public void testSuccessivePut() throws Exception {
095    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
096
097    List<Put> puts = new ArrayList<>(1);
098    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
099    final int expectedSize = puts.size();
100    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
101    arf.waitUntilDone();
102    Object[] result = arf.getResults();
103    assertEquals(expectedSize, result.length);
104    for (Object r : result) {
105      assertEquals(Result.class, r.getClass());
106    }
107    assertTrue(puts.isEmpty());
108    assertActionsInProgress(arf);
109  }
110
111  @Test
112  public void testFailedPut() throws Exception {
113    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
114
115    List<Put> puts = new ArrayList<>(2);
116    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
117    // this put should fail
118    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
119    final int expectedSize = puts.size();
120
121    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
122    arf.waitUntilDone();
123    // There is a failed puts
124    assertError(arf, 1);
125    Object[] result = arf.getResults();
126    assertEquals(expectedSize, result.length);
127    assertEquals(Result.class, result[0].getClass());
128    assertTrue(result[1] instanceof IOException);
129    assertTrue(puts.isEmpty());
130    assertActionsInProgress(arf);
131  }
132
133  @Test
134  public void testFailedPutWithoutActionException() throws Exception {
135    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
136
137    List<Put> puts = new ArrayList<>(3);
138    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
139    // this put should fail
140    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
141    // this put should fail, and it won't have action exception
142    puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
143    final int expectedSize = puts.size();
144
145    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
146    arf.waitUntilDone();
147    // There are two failed puts
148    assertError(arf, 2);
149    Object[] result = arf.getResults();
150    assertEquals(expectedSize, result.length);
151    assertEquals(Result.class, result[0].getClass());
152    assertTrue(result[1] instanceof IOException);
153    assertTrue(result[2] instanceof IOException);
154    assertTrue(puts.isEmpty());
155    assertActionsInProgress(arf);
156  }
157
158  private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
159    assertTrue(arf.hasError());
160    RetriesExhaustedWithDetailsException e = arf.getErrors();
161    List<Throwable> errors = e.getCauses();
162    assertEquals(expectedCountOfFailure, errors.size());
163    for (Throwable t : errors) {
164      assertTrue(t instanceof IOException);
165    }
166  }
167
168  private static void assertActionsInProgress(AsyncRequestFuture arf) {
169    if (arf instanceof AsyncRequestFutureImpl) {
170      assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
171    }
172  }
173
174  private static ClusterConnection createHConnection() throws IOException {
175    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
176    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
177    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
178    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
179    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
180    Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
181    setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
182    setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
183    Mockito
184      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
185      .thenReturn(Collections.singletonList(REGION_LOCATION));
186    return hc;
187  }
188
189  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
190    throws IOException {
191    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
192      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
193    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
194      Mockito.anyBoolean())).thenReturn(result);
195  }
196
197  private static class MyAsyncProcess extends AsyncProcess {
198    private final ExecutorService service = Executors.newFixedThreadPool(5);
199
200    MyAsyncProcess(ClusterConnection hc, Configuration conf) {
201      super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
202        new RpcControllerFactory(conf));
203    }
204
205    public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
206      throws InterruptedIOException {
207      return submit(AsyncProcessTask.newBuilder().setPool(service).setTableName(tableName)
208        .setRowAccess(rows).setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
209        .setNeedResults(true).setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
210        .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT).build());
211    }
212
213    @Override
214    protected RpcRetryingCaller<AbstractResponse>
215      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
216      MultiServerCallable callable1 = (MultiServerCallable) callable;
217      MultiResponse mr = new MultiResponse();
218      callable1.getMulti().actions.forEach((regionName, actions) -> {
219        actions.forEach(action -> {
220          if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
221            mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
222          } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
223            mr.add(regionName, action.getOriginalIndex(), IOE);
224          }
225        });
226      });
227      mr.addException(REGION_INFO.getRegionName(), IOE);
228      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0,
229        RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
230        @Override
231        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
232          int callTimeout) {
233          try {
234            // sleep one second in order for threadpool to start another thread instead of reusing
235            // existing one.
236            Thread.sleep(1000);
237          } catch (InterruptedException e) {
238            // pass
239          }
240          return mr;
241        }
242      };
243    }
244  }
245}