001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ExecutionException;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.ExtendedCell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.ipc.HBaseRpcController;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.mockito.Mockito;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
062
063/**
064 * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
065 */
066@Category({ MediumTests.class, ClientTests.class })
067public class TestMalformedCellFromClient {
068  private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class);
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
072
073  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
075  private static final int CELL_SIZE = 100;
076  private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    // disable the retry
081    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
082    TEST_UTIL.startMiniCluster(1);
083  }
084
085  @Before
086  public void before() throws Exception {
087    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
088      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
089      .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
090    TEST_UTIL.getConnection().getAdmin().createTable(desc);
091  }
092
093  @After
094  public void tearDown() throws Exception {
095    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
096      TEST_UTIL.deleteTable(htd.getTableName());
097    }
098  }
099
100  @AfterClass
101  public static void tearDownAfterClass() throws Exception {
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  /**
106   * The purpose of this ut is to check the consistency between the exception and results. If the
107   * RetriesExhaustedWithDetailsException contains the whole batch, each result should be of IOE.
108   * Otherwise, the row operation which is not in the exception should have a true result.
109   */
110  @Test
111  public void testRegionException() throws InterruptedException, IOException {
112    List<Row> batches = new ArrayList<>();
113    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
114    // the rm is used to prompt the region exception.
115    // see RSRpcServices#multi
116    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
117    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
118    batches.add(rm);
119    Object[] results = new Object[batches.size()];
120
121    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
122      Throwable exceptionByCaught = null;
123      try {
124        table.batch(batches, results);
125        fail("Where is the exception? We put the malformed cells!!!");
126      } catch (RetriesExhaustedException e) {
127        exceptionByCaught = e.getCause();
128      }
129      for (Object obj : results) {
130        assertNotNull(obj);
131      }
132      assertEquals(Result.class, results[0].getClass());
133      assertEquals(exceptionByCaught.getClass(), results[1].getClass());
134      Result result = table.get(new Get(Bytes.toBytes("good")));
135      assertEquals(1, result.size());
136      Cell cell = result.getColumnLatestCell(FAMILY, null);
137      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
138    }
139  }
140
141  /**
142   * This test verifies region exception doesn't corrupt the results of batch. The prescription is
143   * shown below. 1) honor the action result rather than region exception. If the action have both
144   * of true result and region exception, the action is fine as the exception is caused by other
145   * actions which are in the same region. 2) honor the action exception rather than region
146   * exception. If the action have both of action exception and region exception, we deal with the
147   * action exception only. If we also handle the region exception for the same action, it will
148   * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
149   * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
150   * result should be of IOE. Otherwise, the row operation which is not in the exception should have
151   * a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
152   */
153  @Test
154  public void testRegionExceptionByAsync() throws Exception {
155    List<Row> batches = new ArrayList<>();
156    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
157    // the rm is used to prompt the region exception.
158    // see RSRpcServices#multi
159    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
160    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
161    batches.add(rm);
162    try (AsyncConnection asyncConnection =
163      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
164      AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
165      List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
166      assertEquals(2, results.size());
167      try {
168        results.get(1).get();
169        fail("Where is the exception? We put the malformed cells!!!");
170      } catch (ExecutionException e) {
171        // pass
172      }
173      Result result = table.get(new Get(Bytes.toBytes("good"))).get();
174      assertEquals(1, result.size());
175      Cell cell = result.getColumnLatestCell(FAMILY, null);
176      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
177    }
178  }
179
180  /**
181   * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
182   * Currently, we have no client api to submit the request consisting of condition-rm and mutation.
183   * Hence, this test build the request manually.
184   */
185  @Test
186  public void testAtomicOperations() throws Exception {
187    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
188    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
189    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
190    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
191
192    // build the request
193    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
194    ClientProtos.MultiRequest request =
195      ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
196        .addRegionAction(ClientProtos.RegionAction.newBuilder()
197          .setRegion(RequestConverter.buildRegionSpecifier(
198            HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
199            r.getRegionInfo().getRegionName()))
200          .addAction(ClientProtos.Action.newBuilder().setMutation(
201            ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
202        .build();
203
204    List<ExtendedCell> cells = new ArrayList<>();
205    for (Mutation m : rm.getMutations()) {
206      cells.addAll(m.getCellList(FAMILY));
207    }
208    cells.addAll(put.getCellList(FAMILY));
209    assertEquals(3, cells.size());
210    HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
211    Mockito.when(controller.cellScanner())
212      .thenReturn(PrivateCellUtil.createExtendedCellScanner(cells));
213    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(TEST_UTIL
214      .getMiniHBaseCluster().getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
215
216    ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
217    assertEquals(2, response.getRegionActionResultCount());
218    assertTrue(response.getRegionActionResultList().get(0).hasException());
219    assertFalse(response.getRegionActionResultList().get(1).hasException());
220    assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
221    assertTrue(
222      response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
223    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
224      Result result = table.get(new Get(Bytes.toBytes("good")));
225      assertEquals(1, result.size());
226      Cell cell = result.getColumnLatestCell(FAMILY, null);
227      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
228    }
229  }
230
231  private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
232    throws IOException {
233    ClientProtos.RegionAction.Builder builder = RequestConverter
234      .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
235    builder.setAtomic(true);
236    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
237    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
238    ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null,
239      CompareOperator.EQUAL, new byte[10], null, null);
240    for (Mutation mutation : rm.getMutations()) {
241      ClientProtos.MutationProto.MutationType mutateType = null;
242      if (mutation instanceof Put) {
243        mutateType = ClientProtos.MutationProto.MutationType.PUT;
244      } else if (mutation instanceof Delete) {
245        mutateType = ClientProtos.MutationProto.MutationType.DELETE;
246      } else {
247        throw new DoNotRetryIOException(
248          "RowMutations supports only put and delete, not " + mutation.getClass().getName());
249      }
250      mutationBuilder.clear();
251      ClientProtos.MutationProto mp =
252        ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
253      actionBuilder.clear();
254      actionBuilder.setMutation(mp);
255      builder.addAction(actionBuilder.build());
256    }
257    ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder()
258      .addRegionAction(builder.setCondition(condition).build()).build();
259    return request;
260  }
261
262  /**
263   * This test depends on how regionserver process the batch ops. 1) group the put/delete until
264   * meeting the increment 2) process the batch of put/delete 3) process the increment see
265   * RSRpcServices#doNonAtomicRegionMutation
266   */
267  @Test
268  public void testNonAtomicOperations() throws InterruptedException, IOException {
269    Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100);
270    List<Row> batches = new ArrayList<>();
271    // the first and second puts will be group by regionserver
272    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
273    batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
274    // this Increment should succeed
275    batches.add(inc);
276    // this put should succeed
277    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
278    Object[] objs = new Object[batches.size()];
279    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
280      table.batch(batches, objs);
281      fail("Where is the exception? We put the malformed cells!!!");
282    } catch (RetriesExhaustedException e) {
283      Throwable error = e.getCause();
284      for (;;) {
285        assertNotNull("Can not find a DoNotRetryIOException on stack trace", error);
286        if (error instanceof DoNotRetryIOException) {
287          break;
288        }
289        error = error.getCause();
290      }
291    } finally {
292      assertObjects(objs, batches.size());
293      assertTrue(objs[0] instanceof IOException);
294      assertTrue(objs[1] instanceof IOException);
295      assertEquals(Result.class, objs[2].getClass());
296      assertEquals(Result.class, objs[3].getClass());
297    }
298  }
299
300  @Test
301  public void testRowMutations() throws InterruptedException, IOException {
302    Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
303    List<Row> batches = new ArrayList<>();
304    RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
305    // the first and second puts will be group by regionserver
306    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
307    mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
308    batches.add(mutations);
309    // this bm should succeed
310    mutations = new RowMutations(Bytes.toBytes("good"));
311    mutations.add(put);
312    mutations.add(put);
313    batches.add(mutations);
314    Object[] objs = new Object[batches.size()];
315    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
316      table.batch(batches, objs);
317      fail("Where is the exception? We put the malformed cells!!!");
318    } catch (RetriesExhaustedException e) {
319      Throwable error = e.getCause();
320      for (;;) {
321        assertNotNull("Can not find a DoNotRetryIOException on stack trace", error);
322        if (error instanceof DoNotRetryIOException) {
323          break;
324        }
325        error = error.getCause();
326      }
327    } finally {
328      assertObjects(objs, batches.size());
329      assertTrue(objs[0] instanceof IOException);
330      assertEquals(Result.class, objs[1].getClass());
331    }
332  }
333
334  private static void assertObjects(Object[] objs, int expectedSize) {
335    int count = 0;
336    for (Object obj : objs) {
337      assertNotNull(obj);
338      ++count;
339    }
340    assertEquals(expectedSize, count);
341  }
342}