001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.ForkJoinPool; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.stream.Collectors; 026import java.util.stream.IntStream; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.junit.AfterClass; 035import org.junit.BeforeClass; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039 040@Category({ MediumTests.class, ClientTests.class }) 041public class TestAsyncTableScannerCloseWhileSuspending { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestAsyncTableScannerCloseWhileSuspending.class); 046 047 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 048 049 private static TableName TABLE_NAME = TableName.valueOf("async"); 050 051 private static byte[] FAMILY = Bytes.toBytes("cf"); 052 053 private static byte[] CQ = Bytes.toBytes("cq"); 054 055 private static AsyncConnection CONN; 056 057 private static AsyncTable<?> TABLE; 058 059 @BeforeClass 060 public static void setUp() throws Exception { 061 TEST_UTIL.startMiniCluster(1); 062 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 063 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 064 TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 065 TABLE.putAll(IntStream.range(0, 100).mapToObj( 066 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 067 .collect(Collectors.toList())).get(); 068 } 069 070 @AfterClass 071 public static void tearDown() throws Exception { 072 CONN.close(); 073 TEST_UTIL.shutdownMiniCluster(); 074 } 075 076 private int getScannersCount() { 077 return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 078 .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()).sum(); 079 } 080 081 @Test 082 public void testCloseScannerWhileSuspending() throws Exception { 083 final AtomicInteger onNextCounter = new AtomicInteger(0); 084 final CountDownLatch latch = new CountDownLatch(1); 085 final Scan scan = new Scan().setMaxResultSize(1); 086 final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { 087 @Override 088 public void onNext(Result[] results, ScanController controller) { 089 onNextCounter.incrementAndGet(); 090 super.onNext(results, controller); 091 } 092 093 @Override 094 public void onComplete() { 095 super.onComplete(); 096 latch.countDown(); 097 } 098 }; 099 100 CONN.getTable(TABLE_NAME).scan(scan, scanner); 101 102 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 103 104 @Override 105 public boolean evaluate() throws Exception { 106 return scanner.isSuspended(); 107 } 108 109 @Override 110 public String explainFailure() throws Exception { 111 return "The given scanner has been suspended in time"; 112 } 113 }); 114 assertEquals(1, getScannersCount()); 115 assertEquals(1, onNextCounter.get()); 116 117 scanner.close(); 118 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 119 120 @Override 121 public boolean evaluate() throws Exception { 122 return getScannersCount() == 0; 123 } 124 125 @Override 126 public String explainFailure() throws Exception { 127 return "Still have " + getScannersCount() + " scanners opened"; 128 } 129 }); 130 latch.await(); 131 assertEquals(1, onNextCounter.get()); 132 } 133}