001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.net.SocketTimeoutException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.RegionLocator; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.RetriesExhaustedException; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.ipc.CallTimeoutException; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.RSRpcServices; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 056import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 057import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 058 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 060 061/** 062 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and 063 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which 064 * injects delays to get, scan and mutate operations. 065 * <p/> 066 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the 067 * client will retry the operation 'hbase.client.retries.number' times. After that 068 * {@link SocketTimeoutException} will be thrown. 069 * <p/> 070 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be 071 * specified for scan related operations such as openScanner(), next(). If that times out 072 * {@link RetriesExhaustedException} will be thrown. 073 */ 074@Category({ ClientTests.class, MediumTests.class }) 075public class TestClientOperationTimeout { 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationTimeout.class); 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestClientOperationTimeout.class); 082 083 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 084 085 // Activate the delays after table creation to test get/scan/put 086 private static int DELAY_GET; 087 private static int DELAY_SCAN; 088 private static int DELAY_MUTATE; 089 private static int DELAY_BATCH_MUTATE; 090 091 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 092 private static final byte[] FAMILY = Bytes.toBytes("family"); 093 private static final byte[] ROW = Bytes.toBytes("row"); 094 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 095 private static final byte[] VALUE = Bytes.toBytes("value"); 096 097 private static Connection CONN; 098 private static Table TABLE; 099 100 @BeforeClass 101 public static void setUp() throws Exception { 102 // Set RegionServer class and use default values for other options. 103 StartTestingClusterOption option = 104 StartTestingClusterOption.builder().rsClass(DelayedRegionServer.class).build(); 105 UTIL.startMiniCluster(option); 106 UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 107 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); 108 109 Configuration conf = new Configuration(UTIL.getConfiguration()); 110 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500); 111 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500); 112 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500); 113 conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 114 CONN = ConnectionFactory.createConnection(conf); 115 TABLE = CONN.getTable(TABLE_NAME); 116 } 117 118 @AfterClass 119 public static void tearDown() throws Exception { 120 Closeables.close(TABLE, true); 121 Closeables.close(CONN, true); 122 UTIL.shutdownMiniCluster(); 123 } 124 125 @Before 126 public void setUpBeforeTest() throws Exception { 127 DELAY_GET = 0; 128 DELAY_SCAN = 0; 129 DELAY_MUTATE = 0; 130 DELAY_BATCH_MUTATE = 0; 131 } 132 133 /** 134 * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes 135 * longer than 'hbase.client.operation.timeout'. 136 */ 137 @Test 138 public void testGetTimeout() { 139 DELAY_GET = 600; 140 try { 141 TABLE.get(new Get(ROW)); 142 fail("should not reach here"); 143 } catch (Exception e) { 144 LOG.info("Got exception for get", e); 145 assertThat(e, instanceOf(RetriesExhaustedException.class)); 146 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 147 } 148 } 149 150 /** 151 * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes 152 * longer than 'hbase.client.operation.timeout'. 153 */ 154 @Test 155 public void testPutTimeout() { 156 DELAY_MUTATE = 600; 157 Put put = new Put(ROW); 158 put.addColumn(FAMILY, QUALIFIER, VALUE); 159 try { 160 TABLE.put(put); 161 fail("should not reach here"); 162 } catch (Exception e) { 163 LOG.info("Got exception for put", e); 164 assertThat(e, instanceOf(RetriesExhaustedException.class)); 165 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 166 } 167 } 168 169 /** 170 * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the 171 * operation takes longer than 'hbase.client.operation.timeout'. 172 */ 173 @Test 174 public void testMultiPutsTimeout() { 175 DELAY_BATCH_MUTATE = 600; 176 Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); 177 Put put2 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); 178 List<Put> puts = Arrays.asList(put1, put2); 179 try { 180 TABLE.batch(puts, new Object[2]); 181 fail("should not reach here"); 182 } catch (Exception e) { 183 LOG.info("Got exception for batch", e); 184 assertThat(e, instanceOf(RetriesExhaustedException.class)); 185 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 186 assertThat(e.getCause().getCause(), instanceOf(CallTimeoutException.class)); 187 } 188 } 189 190 /** 191 * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes 192 * longer than 'hbase.client.scanner.timeout.period'. 193 */ 194 @Test 195 public void testScanTimeout() throws IOException, InterruptedException { 196 // cache the region location. 197 try (RegionLocator locator = TABLE.getRegionLocator()) { 198 locator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 199 } 200 // sleep a bit to make sure the location has been cached as it is an async operation. 201 Thread.sleep(100); 202 DELAY_SCAN = 600; 203 try (ResultScanner scanner = TABLE.getScanner(new Scan())) { 204 scanner.next(); 205 fail("should not reach here"); 206 } catch (Exception e) { 207 LOG.info("Got exception for scan", e); 208 assertThat(e, instanceOf(RetriesExhaustedException.class)); 209 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 210 } 211 } 212 213 public static final class DelayedRegionServer 214 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 215 public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException { 216 super(conf); 217 } 218 219 @Override 220 protected RSRpcServices createRpcServices() throws IOException { 221 return new DelayedRSRpcServices(this); 222 } 223 } 224 225 /** 226 * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods. 227 */ 228 private static final class DelayedRSRpcServices extends RSRpcServices { 229 DelayedRSRpcServices(HRegionServer rs) throws IOException { 230 super(rs); 231 } 232 233 @Override 234 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 235 throws ServiceException { 236 try { 237 Thread.sleep(DELAY_GET); 238 } catch (InterruptedException e) { 239 LOG.error("Sleep interrupted during get operation", e); 240 } 241 return super.get(controller, request); 242 } 243 244 @Override 245 public ClientProtos.MutateResponse mutate(RpcController rpcc, 246 ClientProtos.MutateRequest request) throws ServiceException { 247 try { 248 Thread.sleep(DELAY_MUTATE); 249 } catch (InterruptedException e) { 250 LOG.error("Sleep interrupted during mutate operation", e); 251 } 252 return super.mutate(rpcc, request); 253 } 254 255 @Override 256 public ClientProtos.ScanResponse scan(RpcController controller, 257 ClientProtos.ScanRequest request) throws ServiceException { 258 try { 259 Thread.sleep(DELAY_SCAN); 260 } catch (InterruptedException e) { 261 LOG.error("Sleep interrupted during scan operation", e); 262 } 263 return super.scan(controller, request); 264 } 265 266 @Override 267 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 268 throws ServiceException { 269 try { 270 Thread.sleep(DELAY_BATCH_MUTATE); 271 } catch (InterruptedException e) { 272 LOG.error("Sleep interrupted during multi operation", e); 273 } 274 return super.multi(rpcc, request); 275 } 276 } 277}