001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; 022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; 023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.ArgumentMatchers.any; 029import static org.mockito.ArgumentMatchers.anyInt; 030import static org.mockito.ArgumentMatchers.anyLong; 031import static org.mockito.ArgumentMatchers.argThat; 032import static org.mockito.Mockito.atLeast; 033import static org.mockito.Mockito.doAnswer; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.times; 036import static org.mockito.Mockito.verify; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.Optional; 041import java.util.concurrent.CompletableFuture; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellBuilderFactory; 049import org.apache.hadoop.hbase.CellBuilderType; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.UserProvider; 058import org.apache.hadoop.hbase.testclassification.ClientTests; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.junit.Before; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.mockito.ArgumentMatcher; 068import org.mockito.invocation.InvocationOnMock; 069import org.mockito.stubbing.Answer; 070 071import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 072 073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 087 088/** 089 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. 090 */ 091@Category({ ClientTests.class, MediumTests.class }) 092public class TestAsyncTableRpcPriority { 093 094 @ClassRule 095 public static final HBaseClassTestRule CLASS_RULE = 096 HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class); 097 098 private static Configuration CONF = HBaseConfiguration.create(); 099 100 private ClientService.Interface stub; 101 102 private ExecutorService threadPool; 103 104 private AsyncConnection conn; 105 106 @Rule 107 public TestName name = new TestName(); 108 109 @Before 110 public void setUp() throws IOException { 111 this.threadPool = Executors.newSingleThreadExecutor(); 112 stub = mock(ClientService.Interface.class); 113 114 doAnswer(new Answer<Void>() { 115 116 @Override 117 public Void answer(InvocationOnMock invocation) throws Throwable { 118 ClientProtos.MultiResponse resp = 119 ClientProtos.MultiResponse.newBuilder() 120 .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( 121 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) 122 .build(); 123 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 124 done.run(resp); 125 return null; 126 } 127 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 128 doAnswer(new Answer<Void>() { 129 130 @Override 131 public Void answer(InvocationOnMock invocation) throws Throwable { 132 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 133 MutateResponse resp; 134 switch (req.getMutateType()) { 135 case INCREMENT: 136 ColumnValue value = req.getColumnValue(0); 137 QualifierValue qvalue = value.getQualifierValue(0); 138 Cell cell = 139 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 140 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 141 .setQualifier(qvalue.getQualifier().toByteArray()) 142 .setValue(qvalue.getValue().toByteArray()).build(); 143 resp = MutateResponse.newBuilder() 144 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 145 break; 146 default: 147 resp = MutateResponse.getDefaultInstance(); 148 break; 149 } 150 RpcCallback<MutateResponse> done = invocation.getArgument(2); 151 done.run(resp); 152 return null; 153 } 154 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 155 doAnswer(new Answer<Void>() { 156 157 @Override 158 public Void answer(InvocationOnMock invocation) throws Throwable { 159 RpcCallback<GetResponse> done = invocation.getArgument(2); 160 done.run(GetResponse.getDefaultInstance()); 161 return null; 162 } 163 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 164 User user = UserProvider.instantiate(CONF).getCurrent(); 165 conn = 166 new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", user) { 167 168 @Override 169 AsyncRegionLocator getLocator() { 170 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 171 Answer<CompletableFuture<HRegionLocation>> answer = 172 new Answer<CompletableFuture<HRegionLocation>>() { 173 174 @Override 175 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 176 throws Throwable { 177 TableName tableName = invocation.getArgument(0); 178 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 179 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 180 HRegionLocation loc = new HRegionLocation(info, serverName); 181 return CompletableFuture.completedFuture(loc); 182 } 183 }; 184 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 185 any(RegionLocateType.class), anyLong()); 186 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 187 anyInt(), any(RegionLocateType.class), anyLong()); 188 return locator; 189 } 190 191 @Override 192 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 193 return stub; 194 } 195 }; 196 } 197 198 private HBaseRpcController assertPriority(int priority) { 199 return argThat(new ArgumentMatcher<HBaseRpcController>() { 200 201 @Override 202 public boolean matches(HBaseRpcController controller) { 203 return controller.getPriority() == priority; 204 } 205 }); 206 } 207 208 private ScanRequest assertScannerCloseRequest() { 209 return argThat(new ArgumentMatcher<ScanRequest>() { 210 211 @Override 212 public boolean matches(ScanRequest request) { 213 return request.hasCloseScanner() && request.getCloseScanner(); 214 } 215 }); 216 } 217 218 @Test 219 public void testGet() { 220 conn.getTable(TableName.valueOf(name.getMethodName())) 221 .get(new Get(Bytes.toBytes(0)).setPriority(11)).join(); 222 verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any()); 223 } 224 225 @Test 226 public void testGetNormalTable() { 227 conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join(); 228 verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any()); 229 } 230 231 @Test 232 public void testGetSystemTable() { 233 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 234 .get(new Get(Bytes.toBytes(0))).join(); 235 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 236 } 237 238 @Test 239 public void testGetMetaTable() { 240 conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); 241 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 242 } 243 244 @Test 245 public void testPut() { 246 conn 247 .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) 248 .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 249 .join(); 250 verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any()); 251 } 252 253 @Test 254 public void testPutNormalTable() { 255 conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) 256 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 257 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 258 } 259 260 @Test 261 public void testPutSystemTable() { 262 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 263 .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 264 Bytes.toBytes("v"))) 265 .join(); 266 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 267 } 268 269 @Test 270 public void testPutMetaTable() { 271 conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0)) 272 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 273 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 274 } 275 276 @Test 277 public void testDelete() { 278 conn.getTable(TableName.valueOf(name.getMethodName())) 279 .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join(); 280 verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any()); 281 } 282 283 @Test 284 public void testDeleteNormalTable() { 285 conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0))) 286 .join(); 287 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 288 } 289 290 @Test 291 public void testDeleteSystemTable() { 292 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 293 .delete(new Delete(Bytes.toBytes(0))).join(); 294 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 295 } 296 297 @Test 298 public void testDeleteMetaTable() { 299 conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join(); 300 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 301 } 302 303 @Test 304 public void testAppend() { 305 conn 306 .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) 307 .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 308 .join(); 309 verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any()); 310 } 311 312 @Test 313 public void testAppendNormalTable() { 314 conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) 315 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 316 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 317 } 318 319 @Test 320 public void testAppendSystemTable() { 321 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 322 .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 323 Bytes.toBytes("v"))) 324 .join(); 325 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 326 } 327 328 @Test 329 public void testAppendMetaTable() { 330 conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0)) 331 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 332 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 333 } 334 335 @Test 336 public void testIncrement() { 337 conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0)) 338 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join(); 339 verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any()); 340 } 341 342 @Test 343 public void testIncrementNormalTable() { 344 conn.getTable(TableName.valueOf(name.getMethodName())) 345 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 346 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 347 } 348 349 @Test 350 public void testIncrementSystemTable() { 351 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 352 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 353 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 354 } 355 356 @Test 357 public void testIncrementMetaTable() { 358 conn.getTable(TableName.META_TABLE_NAME) 359 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 360 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 361 } 362 363 @Test 364 public void testCheckAndPut() { 365 conn.getTable(TableName.valueOf(name.getMethodName())) 366 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 367 .ifNotExists() 368 .thenPut(new Put(Bytes.toBytes(0)) 369 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16)) 370 .join(); 371 verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any()); 372 } 373 374 @Test 375 public void testCheckAndPutNormalTable() { 376 conn.getTable(TableName.valueOf(name.getMethodName())) 377 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 378 .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 379 Bytes.toBytes("cq"), Bytes.toBytes("v"))) 380 .join(); 381 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 382 } 383 384 @Test 385 public void testCheckAndPutSystemTable() { 386 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 387 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 388 .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 389 Bytes.toBytes("cq"), Bytes.toBytes("v"))) 390 .join(); 391 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 392 } 393 394 @Test 395 public void testCheckAndPutMetaTable() { 396 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 397 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 398 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 399 .join(); 400 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 401 } 402 403 @Test 404 public void testCheckAndDelete() { 405 conn.getTable(TableName.valueOf(name.getMethodName())) 406 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 407 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join(); 408 verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any()); 409 } 410 411 @Test 412 public void testCheckAndDeleteNormalTable() { 413 conn.getTable(TableName.valueOf(name.getMethodName())) 414 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 415 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); 416 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 417 } 418 419 @Test 420 public void testCheckAndDeleteSystemTable() { 421 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 422 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 423 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); 424 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 425 } 426 427 @Test 428 public void testCheckAndDeleteMetaTable() { 429 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 430 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 431 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 432 .join(); 433 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 434 } 435 436 @Test 437 public void testCheckAndMutate() throws IOException { 438 conn.getTable(TableName.valueOf(name.getMethodName())) 439 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 440 .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0)) 441 .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18))) 442 .join(); 443 verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any()); 444 } 445 446 @Test 447 public void testCheckAndMutateNormalTable() throws IOException { 448 conn.getTable(TableName.valueOf(name.getMethodName())) 449 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 450 .ifEquals(Bytes.toBytes("v")) 451 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 452 .join(); 453 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 454 any()); 455 } 456 457 @Test 458 public void testCheckAndMutateSystemTable() throws IOException { 459 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 460 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 461 .ifEquals(Bytes.toBytes("v")) 462 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 463 .join(); 464 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 465 any(ClientProtos.MultiRequest.class), any()); 466 } 467 468 @Test 469 public void testCheckAndMutateMetaTable() throws IOException { 470 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 471 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 472 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 473 .join(); 474 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 475 any(ClientProtos.MultiRequest.class), any()); 476 } 477 478 private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) { 479 int scannerId = 1; 480 CompletableFuture<Void> future = new CompletableFuture<>(); 481 AtomicInteger scanNextCalled = new AtomicInteger(0); 482 doAnswer(new Answer<Void>() { 483 484 @SuppressWarnings("FutureReturnValueIgnored") 485 @Override 486 public Void answer(InvocationOnMock invocation) throws Throwable { 487 threadPool.submit(() -> { 488 ScanRequest req = invocation.getArgument(1); 489 RpcCallback<ScanResponse> done = invocation.getArgument(2); 490 if (!req.hasScannerId()) { 491 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 492 .setMoreResultsInRegion(true).setMoreResults(true).build()); 493 } else { 494 if (req.hasRenew() && req.getRenew()) { 495 future.complete(null); 496 } 497 498 assertFalse("close scanner should not come in with scan priority " + scanPriority, 499 req.hasCloseScanner() && req.getCloseScanner()); 500 501 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 502 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 503 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 504 .setValue(Bytes.toBytes("v")).build(); 505 Result result = Result.create(Arrays.asList(cell)); 506 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 507 .setMoreResultsInRegion(true).setMoreResults(true) 508 .addResults(ProtobufUtil.toResult(result)).build()); 509 } 510 }); 511 return null; 512 } 513 }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); 514 515 doAnswer(new Answer<Void>() { 516 517 @SuppressWarnings("FutureReturnValueIgnored") 518 @Override 519 public Void answer(InvocationOnMock invocation) throws Throwable { 520 threadPool.submit(() -> { 521 ScanRequest req = invocation.getArgument(1); 522 RpcCallback<ScanResponse> done = invocation.getArgument(2); 523 assertTrue("close request should have scannerId", req.hasScannerId()); 524 assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); 525 assertTrue("close request should have closerScanner set", 526 req.hasCloseScanner() && req.getCloseScanner()); 527 528 done.run(ScanResponse.getDefaultInstance()); 529 }); 530 return null; 531 } 532 }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); 533 return future; 534 } 535 536 @Test 537 public void testScan() throws Exception { 538 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19); 539 testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19)); 540 } 541 542 @Test 543 public void testScanNormalTable() throws Exception { 544 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS); 545 testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS)); 546 } 547 548 @Test 549 public void testScanSystemTable() throws Exception { 550 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 551 testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), renewFuture, 552 Optional.empty()); 553 } 554 555 @Test 556 public void testScanMetaTable() throws Exception { 557 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 558 testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); 559 } 560 561 private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture, 562 Optional<Integer> priority) throws Exception { 563 Scan scan = new Scan().setCaching(1).setMaxResultSize(1); 564 priority.ifPresent(scan::setPriority); 565 566 try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { 567 assertNotNull(scanner.next()); 568 // wait for at least one renew to come in before closing 569 renewFuture.join(); 570 } 571 572 // ensures the close thread has time to finish before asserting 573 threadPool.shutdown(); 574 threadPool.awaitTermination(5, TimeUnit.SECONDS); 575 576 // just verify that the calls happened. verification of priority occurred in the mocking 577 // open, next, then one or more lease renewals, then close 578 verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any()); 579 // additionally, explicitly check for a close request 580 verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); 581 } 582 583 @Test 584 public void testBatchNormalTable() { 585 conn.getTable(TableName.valueOf(name.getMethodName())) 586 .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 587 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 588 any()); 589 } 590 591 @Test 592 public void testBatchSystemTable() { 593 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 594 .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 595 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 596 any(ClientProtos.MultiRequest.class), any()); 597 } 598 599 @Test 600 public void testBatchMetaTable() { 601 conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))) 602 .join(); 603 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 604 any(ClientProtos.MultiRequest.class), any()); 605 } 606}