001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyInt; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.RegionLocations; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 046import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.After; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.mockito.InOrder; 057import org.mockito.Mockito; 058import org.mockito.invocation.InvocationOnMock; 059import org.mockito.stubbing.Answer; 060 061/** 062 * Test the ClientScanner. 063 */ 064@Category(SmallTests.class) 065public class TestClientScanner { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestClientScanner.class); 070 071 Scan scan; 072 ExecutorService pool; 073 Configuration conf; 074 ConnectionConfiguration connectionConfig; 075 076 ClusterConnection clusterConn; 077 RpcRetryingCallerFactory rpcFactory; 078 RpcControllerFactory controllerFactory; 079 080 @Rule 081 public TestName name = new TestName(); 082 083 @Before 084 public void setup() throws IOException { 085 clusterConn = Mockito.mock(ClusterConnection.class); 086 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); 087 controllerFactory = Mockito.mock(RpcControllerFactory.class); 088 pool = Executors.newSingleThreadExecutor(); 089 scan = new Scan(); 090 conf = new Configuration(); 091 connectionConfig = new ConnectionConfiguration(conf); 092 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); 093 Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig); 094 } 095 096 @After 097 public void teardown() { 098 if (null != pool) { 099 pool.shutdownNow(); 100 } 101 } 102 103 private static class MockClientScanner extends ClientSimpleScanner { 104 105 private boolean rpcFinished = false; 106 private boolean rpcFinishedFired = false; 107 private boolean initialized = false; 108 109 public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 110 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 111 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, 112 ConnectionConfiguration connectionConfig) throws IOException { 113 super(conf, scan, scan, tableName, connection, rpcFactory, controllerFactory, pool, 114 HConstants.DEFAULT_HBASE_RPC_TIMEOUT, 115 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, 116 connectionConfig, Collections.emptyMap()); 117 } 118 119 @Override 120 protected boolean moveToNextRegion() { 121 if (!initialized) { 122 initialized = true; 123 return super.moveToNextRegion(); 124 } 125 if (!rpcFinished) { 126 return super.moveToNextRegion(); 127 } 128 // Enforce that we don't short-circuit more than once 129 if (rpcFinishedFired) { 130 throw new RuntimeException( 131 "Expected nextScanner to only be called once after " + " short-circuit was triggered."); 132 } 133 rpcFinishedFired = true; 134 return false; 135 } 136 137 public void setRpcFinished(boolean rpcFinished) { 138 this.rpcFinished = rpcFinished; 139 } 140 } 141 142 @Test 143 @SuppressWarnings("unchecked") 144 public void testNoResultsHint() throws IOException { 145 final Result[] results = new Result[1]; 146 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 147 KeyValue.Type.Maximum); 148 results[0] = Result.create(new Cell[] { kv1 }); 149 150 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 151 152 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 153 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 154 .thenAnswer(new Answer<Result[]>() { 155 private int count = 0; 156 157 @Override 158 public Result[] answer(InvocationOnMock invocation) throws Throwable { 159 ScannerCallableWithReplicas callable = invocation.getArgument(0); 160 switch (count) { 161 case 0: // initialize 162 count++; 163 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN); 164 return results; 165 case 1: // detect no more results 166 case 2: // close 167 count++; 168 return new Result[0]; 169 default: 170 throw new RuntimeException("Expected only 2 invocations"); 171 } 172 } 173 }); 174 175 // Set a much larger cache and buffer size than we'll provide 176 scan.setCaching(100); 177 scan.setMaxResultSize(1000 * 1000); 178 179 try (MockClientScanner scanner = 180 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 181 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { 182 183 scanner.setRpcFinished(true); 184 185 InOrder inOrder = Mockito.inOrder(caller); 186 187 scanner.loadCache(); 188 189 // One for fetching the results 190 // One for fetching empty results and quit as we do not have moreResults hint. 191 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 192 193 assertEquals(1, scanner.cache.size()); 194 Result r = scanner.cache.poll(); 195 assertNotNull(r); 196 CellScanner cs = r.cellScanner(); 197 assertTrue(cs.advance()); 198 assertEquals(kv1, cs.current()); 199 assertFalse(cs.advance()); 200 } 201 } 202 203 @Test 204 @SuppressWarnings("unchecked") 205 public void testSizeLimit() throws IOException { 206 final Result[] results = new Result[1]; 207 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 208 KeyValue.Type.Maximum); 209 results[0] = Result.create(new Cell[] { kv1 }); 210 211 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 212 213 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 214 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 215 .thenAnswer(new Answer<Result[]>() { 216 private int count = 0; 217 218 @Override 219 public Result[] answer(InvocationOnMock invocation) throws Throwable { 220 ScannerCallableWithReplicas callable = invocation.getArgument(0); 221 switch (count) { 222 case 0: // initialize 223 count++; 224 // if we set no here the implementation will trigger a close 225 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 226 return results; 227 case 1: // close 228 count++; 229 return null; 230 default: 231 throw new RuntimeException("Expected only 2 invocations"); 232 } 233 } 234 }); 235 236 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 237 238 // Set a much larger cache 239 scan.setCaching(100); 240 // The single key-value will exit the loop 241 scan.setMaxResultSize(1); 242 243 try (MockClientScanner scanner = 244 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 245 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { 246 InOrder inOrder = Mockito.inOrder(caller); 247 248 scanner.loadCache(); 249 250 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 251 252 assertEquals(1, scanner.cache.size()); 253 Result r = scanner.cache.poll(); 254 assertNotNull(r); 255 CellScanner cs = r.cellScanner(); 256 assertTrue(cs.advance()); 257 assertEquals(kv1, cs.current()); 258 assertFalse(cs.advance()); 259 } 260 } 261 262 @Test 263 @SuppressWarnings("unchecked") 264 public void testCacheLimit() throws IOException { 265 KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 266 KeyValue.Type.Maximum); 267 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 268 KeyValue.Type.Maximum); 269 KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 270 KeyValue.Type.Maximum); 271 final Result[] results = new Result[] { Result.create(new Cell[] { kv1 }), 272 Result.create(new Cell[] { kv2 }), Result.create(new Cell[] { kv3 }) }; 273 274 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 275 276 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 277 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 278 .thenAnswer(new Answer<Result[]>() { 279 private int count = 0; 280 281 @Override 282 public Result[] answer(InvocationOnMock invocation) throws Throwable { 283 ScannerCallableWithReplicas callable = invocation.getArgument(0); 284 switch (count) { 285 case 0: // initialize 286 count++; 287 // if we set no here the implementation will trigger a close 288 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 289 return results; 290 case 1: // close 291 count++; 292 return null; 293 default: 294 throw new RuntimeException("Expected only 2 invocations"); 295 } 296 } 297 }); 298 299 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 300 301 // Set a small cache 302 scan.setCaching(1); 303 // Set a very large size 304 scan.setMaxResultSize(1000 * 1000); 305 306 try (MockClientScanner scanner = 307 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 308 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { 309 InOrder inOrder = Mockito.inOrder(caller); 310 311 scanner.loadCache(); 312 313 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 314 315 assertEquals(3, scanner.cache.size()); 316 Result r = scanner.cache.poll(); 317 assertNotNull(r); 318 CellScanner cs = r.cellScanner(); 319 assertTrue(cs.advance()); 320 assertEquals(kv1, cs.current()); 321 assertFalse(cs.advance()); 322 323 r = scanner.cache.poll(); 324 assertNotNull(r); 325 cs = r.cellScanner(); 326 assertTrue(cs.advance()); 327 assertEquals(kv2, cs.current()); 328 assertFalse(cs.advance()); 329 330 r = scanner.cache.poll(); 331 assertNotNull(r); 332 cs = r.cellScanner(); 333 assertTrue(cs.advance()); 334 assertEquals(kv3, cs.current()); 335 assertFalse(cs.advance()); 336 } 337 } 338 339 @Test 340 @SuppressWarnings("unchecked") 341 public void testNoMoreResults() throws IOException { 342 final Result[] results = new Result[1]; 343 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 344 KeyValue.Type.Maximum); 345 results[0] = Result.create(new Cell[] { kv1 }); 346 347 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 348 349 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 350 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 351 .thenAnswer(new Answer<Result[]>() { 352 private int count = 0; 353 354 @Override 355 public Result[] answer(InvocationOnMock invocation) throws Throwable { 356 ScannerCallableWithReplicas callable = invocation.getArgument(0); 357 switch (count) { 358 case 0: // initialize 359 count++; 360 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 361 return results; 362 case 1: // close 363 count++; 364 return null; 365 default: 366 throw new RuntimeException("Expected only 2 invocations"); 367 } 368 } 369 }); 370 371 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 372 373 // Set a much larger cache and buffer size than we'll provide 374 scan.setCaching(100); 375 scan.setMaxResultSize(1000 * 1000); 376 377 try (MockClientScanner scanner = 378 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 379 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { 380 scanner.setRpcFinished(true); 381 382 InOrder inOrder = Mockito.inOrder(caller); 383 384 scanner.loadCache(); 385 386 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 387 388 assertEquals(1, scanner.cache.size()); 389 Result r = scanner.cache.poll(); 390 assertNotNull(r); 391 CellScanner cs = r.cellScanner(); 392 assertTrue(cs.advance()); 393 assertEquals(kv1, cs.current()); 394 assertFalse(cs.advance()); 395 } 396 } 397 398 @Test 399 @SuppressWarnings("unchecked") 400 public void testMoreResults() throws IOException { 401 final Result[] results1 = new Result[1]; 402 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 403 KeyValue.Type.Maximum); 404 results1[0] = Result.create(new Cell[] { kv1 }); 405 406 final Result[] results2 = new Result[1]; 407 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 408 KeyValue.Type.Maximum); 409 results2[0] = Result.create(new Cell[] { kv2 }); 410 411 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 412 413 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 414 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 415 .thenAnswer(new Answer<Result[]>() { 416 private int count = 0; 417 418 @Override 419 public Result[] answer(InvocationOnMock invocation) throws Throwable { 420 ScannerCallableWithReplicas callable = invocation.getArgument(0); 421 switch (count) { 422 case 0: // initialize 423 count++; 424 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 425 return results1; 426 case 1: 427 count++; 428 // The server reports back false WRT more results 429 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 430 return results2; 431 case 2: // close 432 count++; 433 return null; 434 default: 435 throw new RuntimeException("Expected only 3 invocations"); 436 } 437 } 438 }); 439 440 // Set a much larger cache and buffer size than we'll provide 441 scan.setCaching(100); 442 scan.setMaxResultSize(1000 * 1000); 443 444 try (MockClientScanner scanner = 445 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 446 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { 447 InOrder inOrder = Mockito.inOrder(caller); 448 scanner.setRpcFinished(true); 449 450 scanner.loadCache(); 451 452 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 453 454 assertEquals(2, scanner.cache.size()); 455 Result r = scanner.cache.poll(); 456 assertNotNull(r); 457 CellScanner cs = r.cellScanner(); 458 assertTrue(cs.advance()); 459 assertEquals(kv1, cs.current()); 460 assertFalse(cs.advance()); 461 462 r = scanner.cache.poll(); 463 assertNotNull(r); 464 cs = r.cellScanner(); 465 assertTrue(cs.advance()); 466 assertEquals(kv2, cs.current()); 467 assertFalse(cs.advance()); 468 } 469 } 470 471 /** 472 * Tests the case where all replicas of a region throw an exception. It should not cause a hang 473 * but the exception should propagate to the client 474 */ 475 @Test 476 public void testExceptionsFromReplicasArePropagated() throws IOException { 477 scan.setConsistency(Consistency.TIMELINE); 478 479 // Mock a caller which calls the callable for ScannerCallableWithReplicas, 480 // but throws an exception for the actual scanner calls via callWithRetries. 481 rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig); 482 conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, 483 MockRpcRetryingCallerFactory.class.getName()); 484 485 // mock 3 replica locations 486 when(clusterConn.locateRegion((TableName) any(), (byte[]) any(), anyBoolean(), anyBoolean(), 487 anyInt())).thenReturn(new RegionLocations(null, null, null)); 488 489 try (MockClientScanner scanner = 490 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 491 rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) { 492 Iterator<Result> iter = scanner.iterator(); 493 while (iter.hasNext()) { 494 iter.next(); 495 } 496 fail("Should have failed with RetriesExhaustedException"); 497 } catch (RuntimeException expected) { 498 assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class)); 499 } 500 } 501 502 public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { 503 504 public MockRpcRetryingCallerFactory(Configuration conf, 505 ConnectionConfiguration connectionConf) { 506 super(conf, connectionConf); 507 } 508 509 @Override 510 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 511 return new RpcRetryingCaller<T>() { 512 @Override 513 public void cancel() { 514 } 515 516 @Override 517 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 518 throws IOException, RuntimeException { 519 throw new IOException("Scanner exception"); 520 } 521 522 @Override 523 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 524 throws IOException, RuntimeException { 525 try { 526 return callable.call(callTimeout); 527 } catch (IOException e) { 528 throw e; 529 } catch (Exception e) { 530 throw new RuntimeException(e); 531 } 532 } 533 }; 534 } 535 } 536}