001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutionException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.ExtendedCell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.ipc.HBaseRpcController; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.testclassification.ClientTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.mockito.Mockito; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 059import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 062 063/** 064 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. 065 */ 066@Category({ MediumTests.class, ClientTests.class }) 067public class TestMalformedCellFromClient { 068 private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); 072 073 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 075 private static final int CELL_SIZE = 100; 076 private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 // disable the retry 081 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 082 TEST_UTIL.startMiniCluster(1); 083 } 084 085 @Before 086 public void before() throws Exception { 087 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 088 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 089 .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); 090 TEST_UTIL.getConnection().getAdmin().createTable(desc); 091 } 092 093 @After 094 public void tearDown() throws Exception { 095 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 096 TEST_UTIL.deleteTable(htd.getTableName()); 097 } 098 } 099 100 @AfterClass 101 public static void tearDownAfterClass() throws Exception { 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 105 /** 106 * The purpose of this ut is to check the consistency between the exception and results. If the 107 * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE. 108 * Otherwise, the row operation which is not in the exception should have a true result. 109 */ 110 @Test 111 public void testRegionException() throws InterruptedException, IOException { 112 List<Row> batches = new ArrayList<>(); 113 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 114 // the rm is used to prompt the region exception. 115 // see RSRpcServices#multi 116 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 117 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 118 batches.add(rm); 119 Object[] results = new Object[batches.size()]; 120 121 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 122 Throwable exceptionByCaught = null; 123 try { 124 table.batch(batches, results); 125 fail("Where is the exception? We put the malformed cells!!!"); 126 } catch (RetriesExhaustedException e) { 127 exceptionByCaught = e.getCause(); 128 } 129 for (Object obj : results) { 130 assertNotNull(obj); 131 } 132 assertEquals(Result.class, results[0].getClass()); 133 assertEquals(exceptionByCaught.getClass(), results[1].getClass()); 134 Result result = table.get(new Get(Bytes.toBytes("good"))); 135 assertEquals(1, result.size()); 136 Cell cell = result.getColumnLatestCell(FAMILY, null); 137 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 138 } 139 } 140 141 /** 142 * This test verifies region exception doesn't corrupt the results of batch. The prescription is 143 * shown below. 1) honor the action result rather than region exception. If the action have both 144 * of true result and region exception, the action is fine as the exception is caused by other 145 * actions which are in the same region. 2) honor the action exception rather than region 146 * exception. If the action have both of action exception and region exception, we deal with the 147 * action exception only. If we also handle the region exception for the same action, it will 148 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 149 * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each 150 * result should be of IOE. Otherwise, the row operation which is not in the exception should have 151 * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. 152 */ 153 @Test 154 public void testRegionExceptionByAsync() throws Exception { 155 List<Row> batches = new ArrayList<>(); 156 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 157 // the rm is used to prompt the region exception. 158 // see RSRpcServices#multi 159 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 160 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 161 batches.add(rm); 162 try (AsyncConnection asyncConnection = 163 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 164 AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME); 165 List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches); 166 assertEquals(2, results.size()); 167 try { 168 results.get(1).get(); 169 fail("Where is the exception? We put the malformed cells!!!"); 170 } catch (ExecutionException e) { 171 // pass 172 } 173 Result result = table.get(new Get(Bytes.toBytes("good"))).get(); 174 assertEquals(1, result.size()); 175 Cell cell = result.getColumnLatestCell(FAMILY, null); 176 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 177 } 178 } 179 180 /** 181 * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. 182 * Currently, we have no client api to submit the request consisting of condition-rm and mutation. 183 * Hence, this test build the request manually. 184 */ 185 @Test 186 public void testAtomicOperations() throws Exception { 187 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 188 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 189 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); 190 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); 191 192 // build the request 193 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 194 ClientProtos.MultiRequest request = 195 ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) 196 .addRegionAction(ClientProtos.RegionAction.newBuilder() 197 .setRegion(RequestConverter.buildRegionSpecifier( 198 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 199 r.getRegionInfo().getRegionName())) 200 .addAction(ClientProtos.Action.newBuilder().setMutation( 201 ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) 202 .build(); 203 204 List<ExtendedCell> cells = new ArrayList<>(); 205 for (Mutation m : rm.getMutations()) { 206 cells.addAll(m.getCellList(FAMILY)); 207 } 208 cells.addAll(put.getCellList(FAMILY)); 209 assertEquals(3, cells.size()); 210 HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); 211 Mockito.when(controller.cellScanner()) 212 .thenReturn(PrivateCellUtil.createExtendedCellScanner(cells)); 213 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL 214 .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); 215 216 ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); 217 assertEquals(2, response.getRegionActionResultCount()); 218 assertTrue(response.getRegionActionResultList().get(0).hasException()); 219 assertFalse(response.getRegionActionResultList().get(1).hasException()); 220 assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); 221 assertTrue( 222 response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); 223 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 224 Result result = table.get(new Get(Bytes.toBytes("good"))); 225 assertEquals(1, result.size()); 226 Cell cell = result.getColumnLatestCell(FAMILY, null); 227 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 228 } 229 } 230 231 private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) 232 throws IOException { 233 ClientProtos.RegionAction.Builder builder = RequestConverter 234 .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); 235 builder.setAtomic(true); 236 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 237 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 238 ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null, 239 CompareOperator.EQUAL, new byte[10], null, null); 240 for (Mutation mutation : rm.getMutations()) { 241 ClientProtos.MutationProto.MutationType mutateType = null; 242 if (mutation instanceof Put) { 243 mutateType = ClientProtos.MutationProto.MutationType.PUT; 244 } else if (mutation instanceof Delete) { 245 mutateType = ClientProtos.MutationProto.MutationType.DELETE; 246 } else { 247 throw new DoNotRetryIOException( 248 "RowMutations supports only put and delete, not " + mutation.getClass().getName()); 249 } 250 mutationBuilder.clear(); 251 ClientProtos.MutationProto mp = 252 ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); 253 actionBuilder.clear(); 254 actionBuilder.setMutation(mp); 255 builder.addAction(actionBuilder.build()); 256 } 257 ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() 258 .addRegionAction(builder.setCondition(condition).build()).build(); 259 return request; 260 } 261 262 /** 263 * This test depends on how regionserver process the batch ops. 1) group the put/delete until 264 * meeting the increment 2) process the batch of put/delete 3) process the increment see 265 * RSRpcServices#doNonAtomicRegionMutation 266 */ 267 @Test 268 public void testNonAtomicOperations() throws InterruptedException, IOException { 269 Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); 270 List<Row> batches = new ArrayList<>(); 271 // the first and second puts will be group by regionserver 272 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 273 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 274 // this Increment should succeed 275 batches.add(inc); 276 // this put should succeed 277 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); 278 Object[] objs = new Object[batches.size()]; 279 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 280 table.batch(batches, objs); 281 fail("Where is the exception? We put the malformed cells!!!"); 282 } catch (RetriesExhaustedException e) { 283 Throwable error = e.getCause(); 284 for (;;) { 285 assertNotNull("Can not find a DoNotRetryIOException on stack trace", error); 286 if (error instanceof DoNotRetryIOException) { 287 break; 288 } 289 error = error.getCause(); 290 } 291 } finally { 292 assertObjects(objs, batches.size()); 293 assertTrue(objs[0] instanceof IOException); 294 assertTrue(objs[1] instanceof IOException); 295 assertEquals(Result.class, objs[2].getClass()); 296 assertEquals(Result.class, objs[3].getClass()); 297 } 298 } 299 300 @Test 301 public void testRowMutations() throws InterruptedException, IOException { 302 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); 303 List<Row> batches = new ArrayList<>(); 304 RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); 305 // the first and second puts will be group by regionserver 306 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 307 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 308 batches.add(mutations); 309 // this bm should succeed 310 mutations = new RowMutations(Bytes.toBytes("good")); 311 mutations.add(put); 312 mutations.add(put); 313 batches.add(mutations); 314 Object[] objs = new Object[batches.size()]; 315 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 316 table.batch(batches, objs); 317 fail("Where is the exception? We put the malformed cells!!!"); 318 } catch (RetriesExhaustedException e) { 319 Throwable error = e.getCause(); 320 for (;;) { 321 assertNotNull("Can not find a DoNotRetryIOException on stack trace", error); 322 if (error instanceof DoNotRetryIOException) { 323 break; 324 } 325 error = error.getCause(); 326 } 327 } finally { 328 assertObjects(objs, batches.size()); 329 assertTrue(objs[0] instanceof IOException); 330 assertEquals(Result.class, objs[1].getClass()); 331 } 332 } 333 334 private static void assertObjects(Object[] objs, int expectedSize) { 335 int count = 0; 336 for (Object obj : objs) { 337 assertNotNull(obj); 338 ++count; 339 } 340 assertEquals(expectedSize, count); 341 } 342}