001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Objects;
025import java.util.Random;
026import java.util.Set;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@InterfaceAudience.Private
046public final class ThrottleQuotaTestUtil {
047
048  private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class);
049  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
050  private final static int REFRESH_TIME = 30 * 60000;
051  static {
052    envEdge.setValue(EnvironmentEdgeManager.currentTime());
053    // only active the envEdge for quotas package
054    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
055      ThrottleQuotaTestUtil.class.getPackage().getName());
056  }
057
058  private ThrottleQuotaTestUtil() {
059    // Hide utility class constructor
060    LOG.debug("Call constructor of ThrottleQuotaTestUtil");
061  }
062
063  static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
064    return doPuts(maxOps, -1, family, qualifier, tables);
065  }
066
067  static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier,
068    final Table... tables) {
069    int count = 0;
070    try {
071      while (count < maxOps) {
072        Put put = new Put(Bytes.toBytes("row-" + count));
073        byte[] value;
074        if (valueSize < 0) {
075          value = Bytes.toBytes("data-" + count);
076        } else {
077          value = generateValue(valueSize);
078        }
079        put.addColumn(family, qualifier, value);
080        for (final Table table : tables) {
081          table.put(put);
082        }
083        count += tables.length;
084      }
085    } catch (IOException e) {
086      LOG.error("put failed after nRetries=" + count, e);
087    }
088    return count;
089  }
090
091  private static byte[] generateValue(int valueSize) {
092    byte[] bytes = new byte[valueSize];
093    for (int i = 0; i < valueSize; i++) {
094      bytes[i] = 'a';
095    }
096    return bytes;
097  }
098
099  static long doGets(int maxOps, final Table... tables) {
100    int count = 0;
101    try {
102      while (count < maxOps) {
103        Get get = new Get(Bytes.toBytes("row-" + count));
104        for (final Table table : tables) {
105          table.get(get);
106        }
107        count += tables.length;
108      }
109    } catch (IOException e) {
110      LOG.error("get failed after nRetries=" + count, e);
111    }
112    return count;
113  }
114
115  static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
116    int count = 0;
117    try {
118      while (count < maxOps) {
119        Get get = new Get(Bytes.toBytes("row-" + count));
120        get.addColumn(family, qualifier);
121        for (final Table table : tables) {
122          table.get(get);
123        }
124        count += tables.length;
125      }
126    } catch (IOException e) {
127      LOG.error("get failed after nRetries=" + count, e);
128    }
129    return count;
130  }
131
132  static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier,
133    final Table... tables) {
134    int opCount = 0;
135    Random random = new Random();
136    try {
137      while (opCount < maxOps) {
138        List<Get> gets = new ArrayList<>(batchSize);
139        while (gets.size() < batchSize) {
140          Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount)));
141          get.addColumn(family, qualifier);
142          gets.add(get);
143        }
144        for (final Table table : tables) {
145          table.get(gets);
146        }
147        opCount += tables.length;
148      }
149    } catch (IOException e) {
150      LOG.error("multiget failed after nRetries=" + opCount, e);
151    }
152    return opCount;
153  }
154
155  static long doScans(int desiredRows, Table table, int caching) {
156    int count = 0;
157    try {
158      Scan scan = new Scan();
159      scan.setCaching(caching);
160      scan.setCacheBlocks(false);
161      ResultScanner scanner = table.getScanner(scan);
162      while (count < desiredRows) {
163        scanner.next();
164        count += 1;
165      }
166    } catch (IOException e) {
167      LOG.error("scan failed after nRetries=" + count, e);
168    }
169    return count;
170  }
171
172  static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
173    TableName... tables) throws Exception {
174    triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables);
175  }
176
177  static void triggerTableCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
178    TableName... tables) throws Exception {
179    triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables);
180  }
181
182  static void triggerNamespaceCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
183    TableName... tables) throws Exception {
184    triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables);
185  }
186
187  static void triggerRegionServerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass)
188    throws Exception {
189    triggerCacheRefresh(testUtil, bypass, false, false, false, true, false);
190  }
191
192  static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtil testUtil,
193    boolean exceedEnabled) throws Exception {
194    triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true);
195  }
196
197  private static void triggerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
198    boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter,
199    boolean exceedThrottleQuota, final TableName... tables) throws Exception {
200    envEdge.incValue(2 * REFRESH_TIME);
201    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
202      RegionServerRpcQuotaManager quotaManager =
203        rst.getRegionServer().getRegionServerRpcQuotaManager();
204      QuotaCache quotaCache = quotaManager.getQuotaCache();
205      quotaCache.triggerCacheRefresh();
206      Thread.sleep(250);
207      testUtil.waitFor(60000, 250, new ExplainingPredicate<Exception>() {
208
209        @Override
210        public boolean evaluate() throws Exception {
211          boolean isUpdated = true;
212          for (TableName table : tables) {
213            if (userLimiter) {
214              boolean isUserBypass =
215                quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass();
216              if (isUserBypass != bypass) {
217                LOG.info(
218                  "User limiter for user={}, table={} not refreshed, bypass expected {}, actual {}",
219                  User.getCurrent(), table, bypass, isUserBypass);
220                envEdge.incValue(100);
221                isUpdated = false;
222                break;
223              }
224            }
225            if (tableLimiter) {
226              boolean isTableBypass = quotaCache.getTableLimiter(table).isBypass();
227              if (isTableBypass != bypass) {
228                LOG.info("Table limiter for table={} not refreshed, bypass expected {}, actual {}",
229                  table, bypass, isTableBypass);
230                envEdge.incValue(100);
231                isUpdated = false;
232                break;
233              }
234            }
235            if (nsLimiter) {
236              boolean isNsBypass =
237                quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
238              if (isNsBypass != bypass) {
239                LOG.info(
240                  "Namespace limiter for namespace={} not refreshed, bypass expected {}, actual {}",
241                  table.getNamespaceAsString(), bypass, isNsBypass);
242                envEdge.incValue(100);
243                isUpdated = false;
244                break;
245              }
246            }
247          }
248          if (rsLimiter) {
249            boolean rsIsBypass = quotaCache
250              .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass();
251            if (rsIsBypass != bypass) {
252              LOG.info("RegionServer limiter not refreshed, bypass expected {}, actual {}", bypass,
253                rsIsBypass);
254              envEdge.incValue(100);
255              isUpdated = false;
256            }
257          }
258          if (exceedThrottleQuota) {
259            if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) {
260              LOG.info("ExceedThrottleQuotaEnabled not refreshed, bypass expected {}, actual {}",
261                bypass, quotaCache.isExceedThrottleQuotaEnabled());
262              envEdge.incValue(100);
263              isUpdated = false;
264            }
265          }
266          if (isUpdated) {
267            return true;
268          }
269          quotaCache.triggerCacheRefresh();
270          return false;
271        }
272
273        @Override
274        public String explainFailure() throws Exception {
275          return "Quota cache is still not refreshed";
276        }
277      });
278
279      LOG.debug("QuotaCache");
280      LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache()));
281      LOG.debug(Objects.toString(quotaCache.getTableQuotaCache()));
282      LOG.debug(Objects.toString(quotaCache.getUserQuotaCache()));
283      LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache()));
284    }
285  }
286
287  static Set<QuotaCache> getQuotaCaches(HBaseTestingUtil testUtil) {
288    Set<QuotaCache> quotaCaches = new HashSet<>();
289    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
290      RegionServerRpcQuotaManager quotaManager =
291        rst.getRegionServer().getRegionServerRpcQuotaManager();
292      quotaCaches.add(quotaManager.getQuotaCache());
293    }
294    return quotaCaches;
295  }
296
297  static void waitMinuteQuota() {
298    envEdge.incValue(70000);
299  }
300
301  static void clearQuotaCache(HBaseTestingUtil testUtil) {
302    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
303      RegionServerRpcQuotaManager quotaManager =
304        rst.getRegionServer().getRegionServerRpcQuotaManager();
305      QuotaCache quotaCache = quotaManager.getQuotaCache();
306      quotaCache.getNamespaceQuotaCache().clear();
307      quotaCache.getTableQuotaCache().clear();
308      quotaCache.getUserQuotaCache().clear();
309      quotaCache.getRegionServerQuotaCache().clear();
310    }
311  }
312}