001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.ArgumentMatchers.any;
029import static org.mockito.ArgumentMatchers.anyInt;
030import static org.mockito.ArgumentMatchers.anyLong;
031import static org.mockito.ArgumentMatchers.argThat;
032import static org.mockito.Mockito.atLeast;
033import static org.mockito.Mockito.doAnswer;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.times;
036import static org.mockito.Mockito.verify;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.Optional;
041import java.util.concurrent.CompletableFuture;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellBuilderFactory;
049import org.apache.hadoop.hbase.CellBuilderType;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.security.UserProvider;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.mockito.ArgumentMatcher;
068import org.mockito.invocation.InvocationOnMock;
069import org.mockito.stubbing.Answer;
070
071import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
072
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
087
088/**
089 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
090 */
091@Category({ ClientTests.class, MediumTests.class })
092public class TestAsyncTableRpcPriority {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096    HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class);
097
098  private static Configuration CONF = HBaseConfiguration.create();
099
100  private ClientService.Interface stub;
101
102  private ExecutorService threadPool;
103
104  private AsyncConnection conn;
105
106  @Rule
107  public TestName name = new TestName();
108
109  @Before
110  public void setUp() throws IOException {
111    this.threadPool = Executors.newSingleThreadExecutor();
112    stub = mock(ClientService.Interface.class);
113
114    doAnswer(new Answer<Void>() {
115
116      @Override
117      public Void answer(InvocationOnMock invocation) throws Throwable {
118        ClientProtos.MultiResponse resp =
119          ClientProtos.MultiResponse.newBuilder()
120            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
121              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
122            .build();
123        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
124        done.run(resp);
125        return null;
126      }
127    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
128    doAnswer(new Answer<Void>() {
129
130      @Override
131      public Void answer(InvocationOnMock invocation) throws Throwable {
132        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
133        MutateResponse resp;
134        switch (req.getMutateType()) {
135          case INCREMENT:
136            ColumnValue value = req.getColumnValue(0);
137            QualifierValue qvalue = value.getQualifierValue(0);
138            Cell cell =
139              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
140                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
141                .setQualifier(qvalue.getQualifier().toByteArray())
142                .setValue(qvalue.getValue().toByteArray()).build();
143            resp = MutateResponse.newBuilder()
144              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
145            break;
146          default:
147            resp = MutateResponse.getDefaultInstance();
148            break;
149        }
150        RpcCallback<MutateResponse> done = invocation.getArgument(2);
151        done.run(resp);
152        return null;
153      }
154    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
155    doAnswer(new Answer<Void>() {
156
157      @Override
158      public Void answer(InvocationOnMock invocation) throws Throwable {
159        RpcCallback<GetResponse> done = invocation.getArgument(2);
160        done.run(GetResponse.getDefaultInstance());
161        return null;
162      }
163    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
164    User user = UserProvider.instantiate(CONF).getCurrent();
165    conn =
166      new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", user) {
167
168        @Override
169        AsyncRegionLocator getLocator() {
170          AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
171          Answer<CompletableFuture<HRegionLocation>> answer =
172            new Answer<CompletableFuture<HRegionLocation>>() {
173
174              @Override
175              public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
176                throws Throwable {
177                TableName tableName = invocation.getArgument(0);
178                RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
179                ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
180                HRegionLocation loc = new HRegionLocation(info, serverName);
181                return CompletableFuture.completedFuture(loc);
182              }
183            };
184          doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
185            any(RegionLocateType.class), anyLong());
186          doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
187            anyInt(), any(RegionLocateType.class), anyLong());
188          return locator;
189        }
190
191        @Override
192        ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
193          return stub;
194        }
195      };
196  }
197
198  private HBaseRpcController assertPriority(int priority) {
199    return argThat(new ArgumentMatcher<HBaseRpcController>() {
200
201      @Override
202      public boolean matches(HBaseRpcController controller) {
203        return controller.getPriority() == priority;
204      }
205    });
206  }
207
208  private ScanRequest assertScannerCloseRequest() {
209    return argThat(new ArgumentMatcher<ScanRequest>() {
210
211      @Override
212      public boolean matches(ScanRequest request) {
213        return request.hasCloseScanner() && request.getCloseScanner();
214      }
215    });
216  }
217
218  @Test
219  public void testGet() {
220    conn.getTable(TableName.valueOf(name.getMethodName()))
221      .get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
222    verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
223  }
224
225  @Test
226  public void testGetNormalTable() {
227    conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
228    verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
229  }
230
231  @Test
232  public void testGetSystemTable() {
233    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
234      .get(new Get(Bytes.toBytes(0))).join();
235    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
236  }
237
238  @Test
239  public void testGetMetaTable() {
240    conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
241    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
242  }
243
244  @Test
245  public void testPut() {
246    conn
247      .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
248        .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
249      .join();
250    verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
251  }
252
253  @Test
254  public void testPutNormalTable() {
255    conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
256      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
257    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
258  }
259
260  @Test
261  public void testPutSystemTable() {
262    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
263      .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
264        Bytes.toBytes("v")))
265      .join();
266    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
267  }
268
269  @Test
270  public void testPutMetaTable() {
271    conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
272      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
273    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
274  }
275
276  @Test
277  public void testDelete() {
278    conn.getTable(TableName.valueOf(name.getMethodName()))
279      .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join();
280    verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
281  }
282
283  @Test
284  public void testDeleteNormalTable() {
285    conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0)))
286      .join();
287    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
288  }
289
290  @Test
291  public void testDeleteSystemTable() {
292    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
293      .delete(new Delete(Bytes.toBytes(0))).join();
294    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
295  }
296
297  @Test
298  public void testDeleteMetaTable() {
299    conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
300    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
301  }
302
303  @Test
304  public void testAppend() {
305    conn
306      .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
307        .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
308      .join();
309    verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
310  }
311
312  @Test
313  public void testAppendNormalTable() {
314    conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
315      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
316    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
317  }
318
319  @Test
320  public void testAppendSystemTable() {
321    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
322      .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
323        Bytes.toBytes("v")))
324      .join();
325    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
326  }
327
328  @Test
329  public void testAppendMetaTable() {
330    conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
331      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
332    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
333  }
334
335  @Test
336  public void testIncrement() {
337    conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0))
338      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
339    verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
340  }
341
342  @Test
343  public void testIncrementNormalTable() {
344    conn.getTable(TableName.valueOf(name.getMethodName()))
345      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
346    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
347  }
348
349  @Test
350  public void testIncrementSystemTable() {
351    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
352      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
353    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
354  }
355
356  @Test
357  public void testIncrementMetaTable() {
358    conn.getTable(TableName.META_TABLE_NAME)
359      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
360    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
361  }
362
363  @Test
364  public void testCheckAndPut() {
365    conn.getTable(TableName.valueOf(name.getMethodName()))
366      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
367      .ifNotExists()
368      .thenPut(new Put(Bytes.toBytes(0))
369        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
370      .join();
371    verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
372  }
373
374  @Test
375  public void testCheckAndPutNormalTable() {
376    conn.getTable(TableName.valueOf(name.getMethodName()))
377      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
378      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
379        Bytes.toBytes("cq"), Bytes.toBytes("v")))
380      .join();
381    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
382  }
383
384  @Test
385  public void testCheckAndPutSystemTable() {
386    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
387      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
388      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
389        Bytes.toBytes("cq"), Bytes.toBytes("v")))
390      .join();
391    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
392  }
393
394  @Test
395  public void testCheckAndPutMetaTable() {
396    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
397      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
398        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
399      .join();
400    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
401  }
402
403  @Test
404  public void testCheckAndDelete() {
405    conn.getTable(TableName.valueOf(name.getMethodName()))
406      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
407      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
408    verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
409  }
410
411  @Test
412  public void testCheckAndDeleteNormalTable() {
413    conn.getTable(TableName.valueOf(name.getMethodName()))
414      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
415      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
416    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
417  }
418
419  @Test
420  public void testCheckAndDeleteSystemTable() {
421    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
422      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
423      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
424    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
425  }
426
427  @Test
428  public void testCheckAndDeleteMetaTable() {
429    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
430      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
431        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
432      .join();
433    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
434  }
435
436  @Test
437  public void testCheckAndMutate() throws IOException {
438    conn.getTable(TableName.valueOf(name.getMethodName()))
439      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
440      .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0))
441        .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
442      .join();
443    verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
444  }
445
446  @Test
447  public void testCheckAndMutateNormalTable() throws IOException {
448    conn.getTable(TableName.valueOf(name.getMethodName()))
449      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
450      .ifEquals(Bytes.toBytes("v"))
451      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
452      .join();
453    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
454      any());
455  }
456
457  @Test
458  public void testCheckAndMutateSystemTable() throws IOException {
459    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
460      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
461      .ifEquals(Bytes.toBytes("v"))
462      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
463      .join();
464    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
465      any(ClientProtos.MultiRequest.class), any());
466  }
467
468  @Test
469  public void testCheckAndMutateMetaTable() throws IOException {
470    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
471      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
472      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
473      .join();
474    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
475      any(ClientProtos.MultiRequest.class), any());
476  }
477
478  private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
479    int scannerId = 1;
480    CompletableFuture<Void> future = new CompletableFuture<>();
481    AtomicInteger scanNextCalled = new AtomicInteger(0);
482    doAnswer(new Answer<Void>() {
483
484      @SuppressWarnings("FutureReturnValueIgnored")
485      @Override
486      public Void answer(InvocationOnMock invocation) throws Throwable {
487        threadPool.submit(() -> {
488          ScanRequest req = invocation.getArgument(1);
489          RpcCallback<ScanResponse> done = invocation.getArgument(2);
490          if (!req.hasScannerId()) {
491            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
492              .setMoreResultsInRegion(true).setMoreResults(true).build());
493          } else {
494            if (req.hasRenew() && req.getRenew()) {
495              future.complete(null);
496            }
497
498            assertFalse("close scanner should not come in with scan priority " + scanPriority,
499              req.hasCloseScanner() && req.getCloseScanner());
500
501            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
502              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
503              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
504              .setValue(Bytes.toBytes("v")).build();
505            Result result = Result.create(Arrays.asList(cell));
506            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
507              .setMoreResultsInRegion(true).setMoreResults(true)
508              .addResults(ProtobufUtil.toResult(result)).build());
509          }
510        });
511        return null;
512      }
513    }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());
514
515    doAnswer(new Answer<Void>() {
516
517      @SuppressWarnings("FutureReturnValueIgnored")
518      @Override
519      public Void answer(InvocationOnMock invocation) throws Throwable {
520        threadPool.submit(() -> {
521          ScanRequest req = invocation.getArgument(1);
522          RpcCallback<ScanResponse> done = invocation.getArgument(2);
523          assertTrue("close request should have scannerId", req.hasScannerId());
524          assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
525          assertTrue("close request should have closerScanner set",
526            req.hasCloseScanner() && req.getCloseScanner());
527
528          done.run(ScanResponse.getDefaultInstance());
529        });
530        return null;
531      }
532    }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
533    return future;
534  }
535
536  @Test
537  public void testScan() throws Exception {
538    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
539    testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19));
540  }
541
542  @Test
543  public void testScanNormalTable() throws Exception {
544    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
545    testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS));
546  }
547
548  @Test
549  public void testScanSystemTable() throws Exception {
550    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
551    testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), renewFuture,
552      Optional.empty());
553  }
554
555  @Test
556  public void testScanMetaTable() throws Exception {
557    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
558    testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
559  }
560
561  private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
562    Optional<Integer> priority) throws Exception {
563    Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
564    priority.ifPresent(scan::setPriority);
565
566    try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
567      assertNotNull(scanner.next());
568      // wait for at least one renew to come in before closing
569      renewFuture.join();
570    }
571
572    // ensures the close thread has time to finish before asserting
573    threadPool.shutdown();
574    threadPool.awaitTermination(5, TimeUnit.SECONDS);
575
576    // just verify that the calls happened. verification of priority occurred in the mocking
577    // open, next, then one or more lease renewals, then close
578    verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
579    // additionally, explicitly check for a close request
580    verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
581  }
582
583  @Test
584  public void testBatchNormalTable() {
585    conn.getTable(TableName.valueOf(name.getMethodName()))
586      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
587    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
588      any());
589  }
590
591  @Test
592  public void testBatchSystemTable() {
593    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
594      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
595    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
596      any(ClientProtos.MultiRequest.class), any());
597  }
598
599  @Test
600  public void testBatchMetaTable() {
601    conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
602      .join();
603    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
604      any(ClientProtos.MultiRequest.class), any());
605  }
606}