001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.mockito.Mockito; 046 047/** 048 * The purpose of this test is to make sure the region exception won't corrupt the results of batch. 049 * The prescription is shown below. 1) honor the action result rather than region exception. If the 050 * action have both of true result and region exception, the action is fine as the exception is 051 * caused by other actions which are in the same region. 2) honor the action exception rather than 052 * region exception. If the action have both of action exception and region exception, we deal with 053 * the action exception only. If we also handle the region exception for the same action, it will 054 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 055 * block forever. This bug can be reproduced by real use case. see TestMalformedCellFromClient(in 056 * branch-1.4+). It uses the batch of RowMutations to present the bug. Given that the batch of 057 * RowMutations is only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't 058 * encounter this issue. We still backport the fix to branch-1.3 and branch-1.2 in case we ignore 059 * some write paths. 060 */ 061@Category({ ClientTests.class, SmallTests.class }) 062public class TestAsyncProcessWithRegionException { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class); 067 068 private static final Result EMPTY_RESULT = Result.create(null, true); 069 private static final IOException IOE = new IOException("YOU CAN'T PASS"); 070 private static final Configuration CONF = new Configuration(); 071 private static final ConnectionConfiguration CONNECTION_CONFIG = 072 new ConnectionConfiguration(CONF); 073 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 074 private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); 075 private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); 076 private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION = 077 Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION"); 078 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 079 private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1"); 080 private static final RegionInfo REGION_INFO = 081 RegionInfoBuilder.newBuilder(DUMMY_TABLE).setStartKey(HConstants.EMPTY_START_ROW) 082 .setEndKey(HConstants.EMPTY_END_ROW).setSplit(false).setRegionId(1).build(); 083 084 private static final HRegionLocation REGION_LOCATION = 085 new HRegionLocation(REGION_INFO, SERVER_NAME); 086 087 @BeforeClass 088 public static void setUpBeforeClass() { 089 // disable the retry 090 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 091 } 092 093 @Test 094 public void testSuccessivePut() throws Exception { 095 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 096 097 List<Put> puts = new ArrayList<>(1); 098 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 099 final int expectedSize = puts.size(); 100 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 101 arf.waitUntilDone(); 102 Object[] result = arf.getResults(); 103 assertEquals(expectedSize, result.length); 104 for (Object r : result) { 105 assertEquals(Result.class, r.getClass()); 106 } 107 assertTrue(puts.isEmpty()); 108 assertActionsInProgress(arf); 109 } 110 111 @Test 112 public void testFailedPut() throws Exception { 113 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 114 115 List<Put> puts = new ArrayList<>(2); 116 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 117 // this put should fail 118 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 119 final int expectedSize = puts.size(); 120 121 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 122 arf.waitUntilDone(); 123 // There is a failed puts 124 assertError(arf, 1); 125 Object[] result = arf.getResults(); 126 assertEquals(expectedSize, result.length); 127 assertEquals(Result.class, result[0].getClass()); 128 assertTrue(result[1] instanceof IOException); 129 assertTrue(puts.isEmpty()); 130 assertActionsInProgress(arf); 131 } 132 133 @Test 134 public void testFailedPutWithoutActionException() throws Exception { 135 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 136 137 List<Put> puts = new ArrayList<>(3); 138 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 139 // this put should fail 140 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); 141 // this put should fail, and it won't have action exception 142 puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY)); 143 final int expectedSize = puts.size(); 144 145 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); 146 arf.waitUntilDone(); 147 // There are two failed puts 148 assertError(arf, 2); 149 Object[] result = arf.getResults(); 150 assertEquals(expectedSize, result.length); 151 assertEquals(Result.class, result[0].getClass()); 152 assertTrue(result[1] instanceof IOException); 153 assertTrue(result[2] instanceof IOException); 154 assertTrue(puts.isEmpty()); 155 assertActionsInProgress(arf); 156 } 157 158 private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) { 159 assertTrue(arf.hasError()); 160 RetriesExhaustedWithDetailsException e = arf.getErrors(); 161 List<Throwable> errors = e.getCauses(); 162 assertEquals(expectedCountOfFailure, errors.size()); 163 for (Throwable t : errors) { 164 assertTrue(t instanceof IOException); 165 } 166 } 167 168 private static void assertActionsInProgress(AsyncRequestFuture arf) { 169 if (arf instanceof AsyncRequestFutureImpl) { 170 assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress()); 171 } 172 } 173 174 private static ClusterConnection createHConnection() throws IOException { 175 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 176 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 177 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 178 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 179 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 180 Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); 181 setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); 182 setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); 183 Mockito 184 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 185 .thenReturn(Collections.singletonList(REGION_LOCATION)); 186 return hc; 187 } 188 189 private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) 190 throws IOException { 191 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 192 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 193 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 194 Mockito.anyBoolean())).thenReturn(result); 195 } 196 197 private static class MyAsyncProcess extends AsyncProcess { 198 private final ExecutorService service = Executors.newFixedThreadPool(5); 199 200 MyAsyncProcess(ClusterConnection hc, Configuration conf) { 201 super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), 202 new RpcControllerFactory(conf)); 203 } 204 205 public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows) 206 throws InterruptedIOException { 207 return submit(AsyncProcessTask.newBuilder().setPool(service).setTableName(tableName) 208 .setRowAccess(rows).setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL) 209 .setNeedResults(true).setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT) 210 .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT).build()); 211 } 212 213 @Override 214 protected RpcRetryingCaller<AbstractResponse> 215 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 216 MultiServerCallable callable1 = (MultiServerCallable) callable; 217 MultiResponse mr = new MultiResponse(); 218 callable1.getMulti().actions.forEach((regionName, actions) -> { 219 actions.forEach(action -> { 220 if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) { 221 mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT); 222 } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) { 223 mr.add(regionName, action.getOriginalIndex(), IOE); 224 } 225 }); 226 }); 227 mr.addException(REGION_INFO.getRegionName(), IOE); 228 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 229 RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { 230 @Override 231 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 232 int callTimeout) { 233 try { 234 // sleep one second in order for threadpool to start another thread instead of reusing 235 // existing one. 236 Thread.sleep(1000); 237 } catch (InterruptedException e) { 238 // pass 239 } 240 return mr; 241 } 242 }; 243 } 244 } 245}