001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.function.Supplier; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.NamespaceDescriptor; 033import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 036import org.apache.hadoop.hbase.ipc.CallTimeoutException; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.regionserver.RSRpcServices; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 057 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestClientScannerTimeouts { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestClientScannerTimeouts.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); 066 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 067 068 private static AsyncConnection ASYNC_CONN; 069 private static Connection CONN; 070 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 071 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 072 private static final byte[] VALUE = Bytes.toBytes("testValue"); 073 074 private static final byte[] ROW0 = Bytes.toBytes("row-0"); 075 private static final byte[] ROW1 = Bytes.toBytes("row-1"); 076 private static final byte[] ROW2 = Bytes.toBytes("row-2"); 077 private static final byte[] ROW3 = Bytes.toBytes("row-3"); 078 private static final int rpcTimeout = 1000; 079 private static final int scanTimeout = 3 * rpcTimeout; 080 private static final int metaScanTimeout = 6 * rpcTimeout; 081 private static final int CLIENT_RETRIES_NUMBER = 3; 082 083 private static Table table; 084 private static AsyncTable<AdvancedScanResultConsumer> asyncTable; 085 086 @Rule 087 public TestName name = new TestName(); 088 089 @BeforeClass 090 public static void setUpBeforeClass() throws Exception { 091 Configuration conf = TEST_UTIL.getConfiguration(); 092 // Don't report so often so easier to see other rpcs 093 conf.setInt("hbase.regionserver.msginterval", 3 * 10000); 094 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); 095 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); 096 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); 097 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); 098 TEST_UTIL.startMiniCluster(1); 099 100 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); 101 conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); 102 conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); 103 ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); 104 CONN = ASYNC_CONN.toConnection(); 105 } 106 107 @AfterClass 108 public static void tearDownAfterClass() throws Exception { 109 CONN.close(); 110 ASYNC_CONN.close(); 111 TEST_UTIL.shutdownMiniCluster(); 112 } 113 114 public void setup(boolean isSystemTable) throws IOException { 115 RSRpcServicesWithScanTimeout.reset(); 116 117 String nameAsString = name.getMethodName(); 118 if (isSystemTable) { 119 nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; 120 } 121 final TableName tableName = TableName.valueOf(nameAsString); 122 123 TEST_UTIL.createTable(tableName, FAMILY); 124 table = CONN.getTable(tableName); 125 asyncTable = ASYNC_CONN.getTable(tableName); 126 putToTable(table, ROW0); 127 putToTable(table, ROW1); 128 putToTable(table, ROW2); 129 putToTable(table, ROW3); 130 LOG.info("Wrote our four values"); 131 132 table.getRegionLocator().getAllRegionLocations(); 133 134 // reset again incase the creation/population caused anything to trigger 135 RSRpcServicesWithScanTimeout.reset(); 136 } 137 138 private void expectRow(byte[] expected, Result result) { 139 assertTrue("Expected row: " + Bytes.toString(expected), 140 Bytes.equals(expected, result.getRow())); 141 } 142 143 private void expectNumTries(int expected) { 144 assertEquals( 145 "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber, 146 expected, RSRpcServicesWithScanTimeout.tryNumber); 147 // reset for next 148 RSRpcServicesWithScanTimeout.tryNumber = 0; 149 } 150 151 /** 152 * verify that we don't miss any data when encountering an OutOfOrderScannerNextException. 153 * Typically, the only way to naturally trigger this is if a client-side timeout causes an 154 * erroneous next() call. This is relatively hard to do these days because the server attempts to 155 * always return before the timeout. In this test we force the server to throw this exception, so 156 * that we can test the retry logic appropriately. 157 */ 158 @Test 159 public void testRetryOutOfOrderScannerNextException() throws IOException { 160 expectRetryOutOfOrderScannerNext(this::getScanner); 161 } 162 163 /** 164 * AsyncTable version of above 165 */ 166 @Test 167 public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { 168 expectRetryOutOfOrderScannerNext(this::getAsyncScanner); 169 } 170 171 /** 172 * verify that we honor the {@link HConstants#HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD} for normal 173 * scans. 174 */ 175 @Test 176 public void testNormalScanTimeoutOnNext() throws IOException { 177 setup(false); 178 expectTimeoutOnNext(scanTimeout, this::getScanner); 179 } 180 181 /** 182 * AsyncTable version of above 183 */ 184 @Test 185 public void testNormalScanTimeoutOnNextAsync() throws IOException { 186 setup(false); 187 expectTimeoutOnNext(scanTimeout, this::getAsyncScanner); 188 } 189 190 /** 191 * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for 192 * meta scans 193 */ 194 @Test 195 public void testNormalScanTimeoutOnOpenScanner() throws IOException { 196 setup(false); 197 expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner); 198 } 199 200 /** 201 * AsyncTable version of above 202 */ 203 @Test 204 public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { 205 setup(false); 206 expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner); 207 } 208 209 /** 210 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for 211 * next() calls in meta scans 212 */ 213 @Test 214 public void testMetaScanTimeoutOnNext() throws IOException { 215 setup(true); 216 expectTimeoutOnNext(metaScanTimeout, this::getScanner); 217 } 218 219 /** 220 * AsyncTable version of above 221 */ 222 @Test 223 public void testMetaScanTimeoutOnNextAsync() throws IOException { 224 setup(true); 225 expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner); 226 } 227 228 /** 229 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for 230 * openScanner() calls for meta scans 231 */ 232 @Test 233 public void testMetaScanTimeoutOnOpenScanner() throws IOException { 234 setup(true); 235 expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); 236 } 237 238 /** 239 * AsyncTable version of above 240 */ 241 @Test 242 public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { 243 setup(true); 244 expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); 245 } 246 247 private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier) 248 throws IOException { 249 setup(false); 250 RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1; 251 252 LOG.info( 253 "Opening scanner, expecting no errors from first next() call from openScanner response"); 254 ResultScanner scanner = scannerSupplier.get(); 255 Result result = scanner.next(); 256 expectRow(ROW0, result); 257 expectNumTries(0); 258 259 LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); 260 result = scanner.next(); 261 expectRow(ROW1, result); 262 expectNumTries(0); 263 264 LOG.info( 265 "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); 266 result = scanner.next(); 267 expectRow(ROW2, result); 268 expectNumTries(1); 269 270 // reset so no errors. since last call restarted the scan and following 271 // call would otherwise fail 272 RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; 273 274 LOG.info("Finishing scan, expecting no errors"); 275 result = scanner.next(); 276 expectRow(ROW3, result); 277 scanner.close(); 278 279 LOG.info("Testing always throw exception"); 280 byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; 281 int i = 0; 282 283 // test the case that RPC always throws 284 scanner = scannerSupplier.get(); 285 RSRpcServicesWithScanTimeout.throwAlways = true; 286 287 while (true) { 288 LOG.info("Calling scanner.next()"); 289 result = scanner.next(); 290 if (result == null) { 291 break; 292 } else { 293 byte[] expectedResult = expectedResults[i++]; 294 expectRow(expectedResult, result); 295 } 296 } 297 298 // ensure we verified all rows. this along with the expectRow check above 299 // proves that we didn't miss any rows. 300 assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length 301 + ", actual index=" + i, expectedResults.length, i); 302 303 // expect all but the first row (which came from initial openScanner) to have thrown an error 304 expectNumTries(expectedResults.length - 1); 305 306 } 307 308 private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier) 309 throws IOException { 310 RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; 311 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 312 313 LOG.info( 314 "Opening scanner, expecting no timeouts from first next() call from openScanner response"); 315 ResultScanner scanner = scannerSupplier.get(); 316 Result result = scanner.next(); 317 expectRow(ROW0, result); 318 319 LOG.info("Making first next() RPC, expecting no timeout for seqNo 0"); 320 result = scanner.next(); 321 expectRow(ROW1, result); 322 323 LOG.info("Making second next() RPC, expecting timeout"); 324 long start = System.nanoTime(); 325 try { 326 scanner.next(); 327 fail("Expected CallTimeoutException"); 328 } catch (RetriesExhaustedException e) { 329 assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException); 330 } 331 expectTimeout(start, timeout); 332 } 333 334 private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier) 335 throws IOException { 336 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 337 RSRpcServicesWithScanTimeout.sleepOnOpen = true; 338 LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response"); 339 long start = System.nanoTime(); 340 try { 341 scannerSupplier.get().next(); 342 fail("Expected CallTimeoutException"); 343 } catch (RetriesExhaustedException e) { 344 assertTrue("Expected CallTimeoutException, but was " + e.getCause(), 345 e.getCause() instanceof CallTimeoutException); 346 } 347 expectTimeout(start, timeout); 348 } 349 350 private void expectTimeout(long start, int timeout) { 351 long duration = System.nanoTime() - start; 352 assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout); 353 } 354 355 private ResultScanner getScanner() { 356 Scan scan = new Scan(); 357 scan.setCaching(1); 358 try { 359 return table.getScanner(scan); 360 } catch (IOException e) { 361 throw new RuntimeException(e); 362 } 363 } 364 365 private ResultScanner getAsyncScanner() { 366 Scan scan = new Scan(); 367 scan.setCaching(1); 368 return asyncTable.getScanner(scan); 369 } 370 371 private void putToTable(Table ht, byte[] rowkey) throws IOException { 372 Put put = new Put(rowkey); 373 put.addColumn(FAMILY, QUALIFIER, VALUE); 374 ht.put(put); 375 } 376 377 private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { 378 public RegionServerWithScanTimeout(Configuration conf) 379 throws IOException, InterruptedException { 380 super(conf); 381 } 382 383 @Override 384 protected RSRpcServices createRpcServices() throws IOException { 385 return new RSRpcServicesWithScanTimeout(this); 386 } 387 } 388 389 private static class RSRpcServicesWithScanTimeout extends RSRpcServices { 390 private long tableScannerId; 391 392 private static long seqNoToThrowOn = -1; 393 private static boolean throwAlways = false; 394 private static boolean threw; 395 396 private static long seqNoToSleepOn = -1; 397 private static boolean sleepOnOpen = false; 398 private static volatile boolean slept; 399 private static int tryNumber = 0; 400 401 private static int sleepTime = rpcTimeout + 500; 402 403 public static void setSleepForTimeout(int timeout) { 404 sleepTime = timeout + 500; 405 } 406 407 public static void reset() { 408 setSleepForTimeout(scanTimeout); 409 410 seqNoToSleepOn = -1; 411 seqNoToThrowOn = -1; 412 throwAlways = false; 413 threw = false; 414 sleepOnOpen = false; 415 slept = false; 416 tryNumber = 0; 417 } 418 419 public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { 420 super(rs); 421 } 422 423 @Override 424 public ScanResponse scan(final RpcController controller, final ScanRequest request) 425 throws ServiceException { 426 if (request.hasScannerId()) { 427 ScanResponse scanResponse = super.scan(controller, request); 428 if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { 429 return scanResponse; 430 } 431 432 if ( 433 throwAlways 434 || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq()) 435 ) { 436 threw = true; 437 tryNumber++; 438 LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber, 439 tableScannerId); 440 throw new ServiceException(new OutOfOrderScannerNextException()); 441 } 442 443 if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { 444 try { 445 LOG.info("SLEEPING " + sleepTime); 446 Thread.sleep(sleepTime); 447 } catch (InterruptedException e) { 448 } 449 slept = true; 450 tryNumber++; 451 } 452 return scanResponse; 453 } else { 454 ScanResponse scanRes = super.scan(controller, request); 455 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); 456 if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { 457 tableScannerId = scanRes.getScannerId(); 458 if (sleepOnOpen) { 459 try { 460 LOG.info("openScanner SLEEPING " + sleepTime); 461 Thread.sleep(sleepTime); 462 } catch (InterruptedException e) { 463 } 464 } 465 } 466 return scanRes; 467 } 468 } 469 } 470}