001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
027import org.apache.hadoop.hbase.StartTestingClusterOption;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.quotas.OperationQuota;
037import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
038import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
039import org.apache.hadoop.hbase.quotas.TestNoopOperationQuota;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.AfterClass;
044import org.junit.Before;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
051
052@Category({ MediumTests.class, ClientTests.class })
053public class TestScannerLeaseCount {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestScannerLeaseCount.class);
058
059  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
060  private static final TableName TABLE_NAME = TableName.valueOf("ScannerLeaseCount");
061  private static final byte[] FAM = Bytes.toBytes("Fam");
062  private static final String SCAN_IDENTIFIER_NAME = "_scan_id_";
063  private static final byte[] SCAN_IDENTIFIER = Bytes.toBytes("_scan_id_");
064  private static final Scan SCAN = new Scan().setAttribute(SCAN_IDENTIFIER_NAME, SCAN_IDENTIFIER);
065
066  private static volatile boolean SHOULD_THROW = false;
067  private static final AtomicBoolean EXCEPTION_THROWN = new AtomicBoolean(false);
068  private static final AtomicBoolean SCAN_SEEN = new AtomicBoolean(false);
069
070  private static Connection CONN;
071  private static Table TABLE;
072
073  @BeforeClass
074  public static void setUp() throws Exception {
075    StartTestingClusterOption option =
076      StartTestingClusterOption.builder().rsClass(MockedQuotaManagerRegionServer.class).build();
077    UTIL.startMiniCluster(option);
078    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
079      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAM)).build());
080    Configuration conf = new Configuration(UTIL.getConfiguration());
081    CONN = ConnectionFactory.createConnection(conf);
082    TABLE = CONN.getTable(TABLE_NAME);
083    UTIL.loadTable(TABLE, FAM);
084  }
085
086  @AfterClass
087  public static void tearDown() throws Exception {
088    try {
089      TABLE.close();
090    } catch (Exception ignore) {
091    }
092    try {
093      CONN.close();
094    } catch (Exception ignore) {
095    }
096    UTIL.shutdownMiniCluster();
097  }
098
099  @Before
100  public void before() {
101    SHOULD_THROW = false;
102    SCAN_SEEN.set(false);
103    EXCEPTION_THROWN.set(false);
104  }
105
106  @Test
107  public void itIncreasesScannerCount() throws Exception {
108    try (ResultScanner ignore = TABLE.getScanner(SCAN)) {
109      // We need to wait until the scan and lease are created server-side.
110      // Otherwise, our scanner counting will not reflect the new scan that was created
111      UTIL.waitFor(1000, () -> SCAN_SEEN.get() && !EXCEPTION_THROWN.get());
112    }
113  }
114
115  @Test
116  public void itDoesNotIncreaseScannerLeaseCount() throws Exception {
117    SHOULD_THROW = true;
118    try (ResultScanner ignore = TABLE.getScanner(SCAN)) {
119      // We need to wait until the scan and lease are created server-side.
120      // Otherwise, our scanner counting will not reflect the new scan that was created
121      UTIL.waitFor(1000, () -> !SCAN_SEEN.get() && EXCEPTION_THROWN.get());
122    }
123  }
124
125  public static final class MockedQuotaManagerRegionServer
126    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
127    private final MockedRpcQuotaManager rpcQuotaManager;
128
129    public MockedQuotaManagerRegionServer(Configuration conf)
130      throws IOException, InterruptedException {
131      super(conf);
132      this.rpcQuotaManager = new MockedRpcQuotaManager(this);
133    }
134
135    @Override
136    public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
137      return rpcQuotaManager;
138    }
139
140    @Override
141    protected RSRpcServices createRpcServices() throws IOException {
142      return new ScannerTrackingRSRpcServicesForTest(this);
143    }
144  }
145
146  private static class MockedRpcQuotaManager extends RegionServerRpcQuotaManager {
147    private static final RpcThrottlingException EX = new RpcThrottlingException("test_ex");
148
149    public MockedRpcQuotaManager(RegionServerServices rsServices) {
150      super(rsServices);
151    }
152
153    @Override
154    public OperationQuota checkScanQuota(Region region, ClientProtos.ScanRequest scanRequest,
155      long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
156      throws IOException, RpcThrottlingException {
157      if (SHOULD_THROW) {
158        if (isTestScan(scanRequest)) {
159          EXCEPTION_THROWN.set(true);
160        }
161        throw EX;
162      }
163      return TestNoopOperationQuota.INSTANCE;
164    }
165
166    @Override
167    public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
168      throws IOException, RpcThrottlingException {
169      if (SHOULD_THROW) {
170        throw EX;
171      }
172      return TestNoopOperationQuota.INSTANCE;
173    }
174
175    @Override
176    public OperationQuota checkBatchQuota(Region region, List<ClientProtos.Action> actions,
177      boolean hasCondition) throws IOException, RpcThrottlingException {
178      if (SHOULD_THROW) {
179        throw EX;
180      }
181      return TestNoopOperationQuota.INSTANCE;
182    }
183
184    @Override
185    public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads)
186      throws IOException, RpcThrottlingException {
187      if (SHOULD_THROW) {
188        throw EX;
189      }
190      return TestNoopOperationQuota.INSTANCE;
191    }
192  }
193
194  private static class ScannerTrackingRSRpcServicesForTest extends RSRpcServices {
195    public ScannerTrackingRSRpcServicesForTest(HRegionServer rs) throws IOException {
196      super(rs);
197    }
198
199    @Override
200    RegionScannerContext checkQuotaAndGetRegionScannerContext(ClientProtos.ScanRequest request,
201      ClientProtos.ScanResponse.Builder builder) throws IOException {
202      RegionScannerContext rsx = super.checkQuotaAndGetRegionScannerContext(request, builder);
203      if (isTestScan(request)) {
204        SCAN_SEEN.set(true);
205      }
206      return rsx;
207    }
208  }
209
210  private static boolean isTestScan(ClientProtos.ScanRequest request) {
211    ClientProtos.Scan scan = request.getScan();
212    return scan.getAttributeList().stream()
213      .anyMatch(nbp -> nbp.getName().equals(SCAN_IDENTIFIER_NAME)
214        && Bytes.equals(nbp.getValue().toByteArray(), SCAN_IDENTIFIER));
215  }
216}