001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.quotas.OperationQuota;
037import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
038import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047@Category({ MediumTests.class, CoprocessorTests.class })
048public class TestRegionCoprocessorQuotaUsage {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class);
053
054  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
055  private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage");
056  private static byte[] CF = Bytes.toBytes("CF");
057  private static byte[] CQ = Bytes.toBytes("CQ");
058  private static Connection CONN;
059  private static Table TABLE;
060  private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false);
061
062  public static class MyRegionObserver implements RegionObserver {
063    @Override
064    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
065      List<Cell> result) throws IOException {
066
067      // For the purposes of this test, we only need to catch a throttle happening once, then
068      // let future requests pass through so we don't make this test take any longer than necessary
069      if (!THROTTLING_OCCURRED.get()) {
070        try {
071          c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
072            OperationQuota.OperationType.GET);
073        } catch (RpcThrottlingException e) {
074          THROTTLING_OCCURRED.set(true);
075          throw e;
076        }
077      }
078    }
079  }
080
081  public static class MyCoprocessor implements RegionCoprocessor {
082    RegionObserver observer = new MyRegionObserver();
083
084    @Override
085    public Optional<RegionObserver> getRegionObserver() {
086      return Optional.of(observer);
087    }
088  }
089
090  @BeforeClass
091  public static void setUp() throws Exception {
092    Configuration conf = UTIL.getConfiguration();
093    conf.setBoolean("hbase.quota.enabled", true);
094    conf.setInt("hbase.quota.default.user.machine.read.num", 2);
095    conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
096    conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000");
097    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName());
098    UTIL.startMiniCluster(3);
099    byte[][] splitKeys = new byte[8][];
100    for (int i = 111; i < 999; i += 111) {
101      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
102    }
103    UTIL.createTable(TABLE_NAME, CF, splitKeys);
104    CONN = UTIL.getConnection();
105    TABLE = CONN.getTable(TABLE_NAME);
106    TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L)));
107  }
108
109  @AfterClass
110  public static void tearDown() throws Exception {
111    UTIL.shutdownMiniCluster();
112  }
113
114  @Test
115  public void testGet() throws InterruptedException, ExecutionException, IOException {
116    // Hit the table 5 times which ought to be enough to make a throttle happen
117    for (int i = 0; i < 5; i++) {
118      TABLE.get(new Get(Bytes.toBytes("000")));
119    }
120    assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get());
121  }
122}