001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutionException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.mockito.Mockito; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060 061/** 062 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. 063 */ 064@Category({ MediumTests.class, ClientTests.class }) 065public class TestMalformedCellFromClient { 066 private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); 070 071 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 073 private static final int CELL_SIZE = 100; 074 private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws Exception { 078 // disable the retry 079 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 080 TEST_UTIL.startMiniCluster(1); 081 } 082 083 @Before 084 public void before() throws Exception { 085 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 086 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 087 .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); 088 TEST_UTIL.getConnection().getAdmin().createTable(desc); 089 } 090 091 @After 092 public void tearDown() throws Exception { 093 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 094 TEST_UTIL.deleteTable(htd.getTableName()); 095 } 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 /** 104 * The purpose of this ut is to check the consistency between the exception and results. If the 105 * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE. 106 * Otherwise, the row operation which is not in the exception should have a true result. 107 */ 108 @Test 109 public void testRegionException() throws InterruptedException, IOException { 110 List<Row> batches = new ArrayList<>(); 111 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 112 // the rm is used to prompt the region exception. 113 // see RSRpcServices#multi 114 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 115 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 116 batches.add(rm); 117 Object[] results = new Object[batches.size()]; 118 119 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 120 Throwable exceptionByCaught = null; 121 try { 122 table.batch(batches, results); 123 fail("Where is the exception? We put the malformed cells!!!"); 124 } catch (RetriesExhaustedWithDetailsException e) { 125 for (Throwable throwable : e.getCauses()) { 126 assertNotNull(throwable); 127 } 128 assertEquals(1, e.getNumExceptions()); 129 exceptionByCaught = e.getCause(0); 130 } 131 for (Object obj : results) { 132 assertNotNull(obj); 133 } 134 assertEquals(Result.class, results[0].getClass()); 135 assertEquals(exceptionByCaught.getClass(), results[1].getClass()); 136 Result result = table.get(new Get(Bytes.toBytes("good"))); 137 assertEquals(1, result.size()); 138 Cell cell = result.getColumnLatestCell(FAMILY, null); 139 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 140 } 141 } 142 143 /** 144 * This test verifies region exception doesn't corrupt the results of batch. The prescription is 145 * shown below. 1) honor the action result rather than region exception. If the action have both 146 * of true result and region exception, the action is fine as the exception is caused by other 147 * actions which are in the same region. 2) honor the action exception rather than region 148 * exception. If the action have both of action exception and region exception, we deal with the 149 * action exception only. If we also handle the region exception for the same action, it will 150 * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will 151 * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each 152 * result should be of IOE. Otherwise, the row operation which is not in the exception should have 153 * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. 154 */ 155 @Test 156 public void testRegionExceptionByAsync() throws Exception { 157 List<Row> batches = new ArrayList<>(); 158 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); 159 // the rm is used to prompt the region exception. 160 // see RSRpcServices#multi 161 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 162 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 163 batches.add(rm); 164 try (AsyncConnection asyncConnection = 165 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 166 AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME); 167 List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches); 168 assertEquals(2, results.size()); 169 try { 170 results.get(1).get(); 171 fail("Where is the exception? We put the malformed cells!!!"); 172 } catch (ExecutionException e) { 173 // pass 174 } 175 Result result = table.get(new Get(Bytes.toBytes("good"))).get(); 176 assertEquals(1, result.size()); 177 Cell cell = result.getColumnLatestCell(FAMILY, null); 178 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 179 } 180 } 181 182 /** 183 * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. 184 * Currently, we have no client api to submit the request consisting of condition-rm and mutation. 185 * Hence, this test build the request manually. 186 */ 187 @Test 188 public void testAtomicOperations() throws Exception { 189 RowMutations rm = new RowMutations(Bytes.toBytes("fail")); 190 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); 191 rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); 192 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); 193 194 // build the request 195 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 196 ClientProtos.MultiRequest request = 197 ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) 198 .addRegionAction(ClientProtos.RegionAction.newBuilder() 199 .setRegion(RequestConverter.buildRegionSpecifier( 200 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 201 r.getRegionInfo().getRegionName())) 202 .addAction(ClientProtos.Action.newBuilder().setMutation( 203 ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) 204 .build(); 205 206 List<Cell> cells = new ArrayList<>(); 207 for (Mutation m : rm.getMutations()) { 208 cells.addAll(m.getCellList(FAMILY)); 209 } 210 cells.addAll(put.getCellList(FAMILY)); 211 assertEquals(3, cells.size()); 212 HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); 213 Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells)); 214 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL 215 .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); 216 217 ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); 218 assertEquals(2, response.getRegionActionResultCount()); 219 assertTrue(response.getRegionActionResultList().get(0).hasException()); 220 assertFalse(response.getRegionActionResultList().get(1).hasException()); 221 assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); 222 assertTrue( 223 response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); 224 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 225 Result result = table.get(new Get(Bytes.toBytes("good"))); 226 assertEquals(1, result.size()); 227 Cell cell = result.getColumnLatestCell(FAMILY, null); 228 assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); 229 } 230 } 231 232 private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) 233 throws IOException { 234 ClientProtos.RegionAction.Builder builder = RequestConverter 235 .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); 236 builder.setAtomic(true); 237 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 238 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 239 ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null, 240 CompareOperator.EQUAL, new byte[10], null, null); 241 for (Mutation mutation : rm.getMutations()) { 242 ClientProtos.MutationProto.MutationType mutateType = null; 243 if (mutation instanceof Put) { 244 mutateType = ClientProtos.MutationProto.MutationType.PUT; 245 } else if (mutation instanceof Delete) { 246 mutateType = ClientProtos.MutationProto.MutationType.DELETE; 247 } else { 248 throw new DoNotRetryIOException( 249 "RowMutations supports only put and delete, not " + mutation.getClass().getName()); 250 } 251 mutationBuilder.clear(); 252 ClientProtos.MutationProto mp = 253 ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); 254 actionBuilder.clear(); 255 actionBuilder.setMutation(mp); 256 builder.addAction(actionBuilder.build()); 257 } 258 ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() 259 .addRegionAction(builder.setCondition(condition).build()).build(); 260 return request; 261 } 262 263 /** 264 * This test depends on how regionserver process the batch ops. 1) group the put/delete until 265 * meeting the increment 2) process the batch of put/delete 3) process the increment see 266 * RSRpcServices#doNonAtomicRegionMutation 267 */ 268 @Test 269 public void testNonAtomicOperations() throws InterruptedException, IOException { 270 Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); 271 List<Row> batches = new ArrayList<>(); 272 // the first and second puts will be group by regionserver 273 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 274 batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 275 // this Increment should succeed 276 batches.add(inc); 277 // this put should succeed 278 batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); 279 Object[] objs = new Object[batches.size()]; 280 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 281 table.batch(batches, objs); 282 fail("Where is the exception? We put the malformed cells!!!"); 283 } catch (RetriesExhaustedWithDetailsException e) { 284 assertEquals(2, e.getNumExceptions()); 285 for (int i = 0; i != e.getNumExceptions(); ++i) { 286 assertNotNull(e.getCause(i)); 287 assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass()); 288 assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); 289 } 290 } finally { 291 assertObjects(objs, batches.size()); 292 assertTrue(objs[0] instanceof IOException); 293 assertTrue(objs[1] instanceof IOException); 294 assertEquals(Result.class, objs[2].getClass()); 295 assertEquals(Result.class, objs[3].getClass()); 296 } 297 } 298 299 @Test 300 public void testRowMutations() throws InterruptedException, IOException { 301 Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); 302 List<Row> batches = new ArrayList<>(); 303 RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); 304 // the first and second puts will be group by regionserver 305 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 306 mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); 307 batches.add(mutations); 308 // this bm should succeed 309 mutations = new RowMutations(Bytes.toBytes("good")); 310 mutations.add(put); 311 mutations.add(put); 312 batches.add(mutations); 313 Object[] objs = new Object[batches.size()]; 314 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 315 table.batch(batches, objs); 316 fail("Where is the exception? We put the malformed cells!!!"); 317 } catch (RetriesExhaustedWithDetailsException e) { 318 assertEquals(1, e.getNumExceptions()); 319 for (int i = 0; i != e.getNumExceptions(); ++i) { 320 assertNotNull(e.getCause(i)); 321 assertTrue(e.getCause(i) instanceof IOException); 322 assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); 323 } 324 } finally { 325 assertObjects(objs, batches.size()); 326 assertTrue(objs[0] instanceof IOException); 327 assertEquals(Result.class, objs[1].getClass()); 328 } 329 } 330 331 private static void assertObjects(Object[] objs, int expectedSize) { 332 int count = 0; 333 for (Object obj : objs) { 334 assertNotNull(obj); 335 ++count; 336 } 337 assertEquals(expectedSize, count); 338 } 339}