001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Objects; 025import java.util.Random; 026import java.util.Set; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@InterfaceAudience.Private 046public final class ThrottleQuotaTestUtil { 047 048 private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class); 049 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 050 private final static int REFRESH_TIME = 30 * 60000; 051 static { 052 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 053 // only active the envEdge for quotas package 054 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 055 ThrottleQuotaTestUtil.class.getPackage().getName()); 056 } 057 058 private ThrottleQuotaTestUtil() { 059 // Hide utility class constructor 060 LOG.debug("Call constructor of ThrottleQuotaTestUtil"); 061 } 062 063 static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 064 return doPuts(maxOps, -1, family, qualifier, tables); 065 } 066 067 static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier, 068 final Table... tables) { 069 int count = 0; 070 try { 071 while (count < maxOps) { 072 Put put = new Put(Bytes.toBytes("row-" + count)); 073 byte[] value; 074 if (valueSize < 0) { 075 value = Bytes.toBytes("data-" + count); 076 } else { 077 value = generateValue(valueSize); 078 } 079 put.addColumn(family, qualifier, value); 080 for (final Table table : tables) { 081 table.put(put); 082 } 083 count += tables.length; 084 } 085 } catch (IOException e) { 086 LOG.error("put failed after nRetries=" + count, e); 087 } 088 return count; 089 } 090 091 private static byte[] generateValue(int valueSize) { 092 byte[] bytes = new byte[valueSize]; 093 for (int i = 0; i < valueSize; i++) { 094 bytes[i] = 'a'; 095 } 096 return bytes; 097 } 098 099 static long doGets(int maxOps, final Table... tables) { 100 int count = 0; 101 try { 102 while (count < maxOps) { 103 Get get = new Get(Bytes.toBytes("row-" + count)); 104 for (final Table table : tables) { 105 table.get(get); 106 } 107 count += tables.length; 108 } 109 } catch (IOException e) { 110 LOG.error("get failed after nRetries=" + count, e); 111 } 112 return count; 113 } 114 115 static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { 116 int count = 0; 117 try { 118 while (count < maxOps) { 119 Get get = new Get(Bytes.toBytes("row-" + count)); 120 get.addColumn(family, qualifier); 121 for (final Table table : tables) { 122 table.get(get); 123 } 124 count += tables.length; 125 } 126 } catch (IOException e) { 127 LOG.error("get failed after nRetries=" + count, e); 128 } 129 return count; 130 } 131 132 static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, 133 final Table... tables) { 134 int opCount = 0; 135 Random random = new Random(); 136 try { 137 while (opCount < maxOps) { 138 List<Get> gets = new ArrayList<>(batchSize); 139 while (gets.size() < batchSize) { 140 Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount))); 141 get.addColumn(family, qualifier); 142 gets.add(get); 143 } 144 for (final Table table : tables) { 145 table.get(gets); 146 } 147 opCount += tables.length; 148 } 149 } catch (IOException e) { 150 LOG.error("multiget failed after nRetries=" + opCount, e); 151 } 152 return opCount; 153 } 154 155 static long doScans(int desiredRows, Table table, int caching) { 156 int count = 0; 157 try { 158 Scan scan = new Scan(); 159 scan.setCaching(caching); 160 scan.setCacheBlocks(false); 161 ResultScanner scanner = table.getScanner(scan); 162 while (count < desiredRows) { 163 scanner.next(); 164 count += 1; 165 } 166 } catch (IOException e) { 167 LOG.error("scan failed after nRetries=" + count, e); 168 } 169 return count; 170 } 171 172 static void triggerUserCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 173 TableName... tables) throws Exception { 174 triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables); 175 } 176 177 static void triggerTableCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 178 TableName... tables) throws Exception { 179 triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables); 180 } 181 182 static void triggerNamespaceCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 183 TableName... tables) throws Exception { 184 triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables); 185 } 186 187 static void triggerRegionServerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass) 188 throws Exception { 189 triggerCacheRefresh(testUtil, bypass, false, false, false, true, false); 190 } 191 192 static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtility testUtil, 193 boolean exceedEnabled) throws Exception { 194 triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true); 195 } 196 197 private static void triggerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, 198 boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter, 199 boolean exceedThrottleQuota, final TableName... tables) throws Exception { 200 envEdge.incValue(2 * REFRESH_TIME); 201 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 202 RegionServerRpcQuotaManager quotaManager = 203 rst.getRegionServer().getRegionServerRpcQuotaManager(); 204 QuotaCache quotaCache = quotaManager.getQuotaCache(); 205 quotaCache.triggerCacheRefresh(); 206 Thread.sleep(250); 207 testUtil.waitFor(60000, 250, new ExplainingPredicate<Exception>() { 208 209 @Override 210 public boolean evaluate() throws Exception { 211 boolean isUpdated = true; 212 for (TableName table : tables) { 213 if (userLimiter) { 214 boolean isUserBypass = 215 quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass(); 216 if (isUserBypass != bypass) { 217 LOG.info( 218 "User limiter for user={}, table={} not refreshed, bypass expected {}, actual {}", 219 User.getCurrent(), table, bypass, isUserBypass); 220 envEdge.incValue(100); 221 isUpdated = false; 222 break; 223 } 224 } 225 if (tableLimiter) { 226 boolean isTableBypass = quotaCache.getTableLimiter(table).isBypass(); 227 if (isTableBypass != bypass) { 228 LOG.info("Table limiter for table={} not refreshed, bypass expected {}, actual {}", 229 table, bypass, isTableBypass); 230 envEdge.incValue(100); 231 isUpdated = false; 232 break; 233 } 234 } 235 if (nsLimiter) { 236 boolean isNsBypass = 237 quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass(); 238 if (isNsBypass != bypass) { 239 LOG.info( 240 "Namespace limiter for namespace={} not refreshed, bypass expected {}, actual {}", 241 table.getNamespaceAsString(), bypass, isNsBypass); 242 envEdge.incValue(100); 243 isUpdated = false; 244 break; 245 } 246 } 247 } 248 if (rsLimiter) { 249 boolean rsIsBypass = quotaCache 250 .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass(); 251 if (rsIsBypass != bypass) { 252 LOG.info("RegionServer limiter not refreshed, bypass expected {}, actual {}", bypass, 253 rsIsBypass); 254 envEdge.incValue(100); 255 isUpdated = false; 256 } 257 } 258 if (exceedThrottleQuota) { 259 if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) { 260 LOG.info("ExceedThrottleQuotaEnabled not refreshed, bypass expected {}, actual {}", 261 bypass, quotaCache.isExceedThrottleQuotaEnabled()); 262 envEdge.incValue(100); 263 isUpdated = false; 264 } 265 } 266 if (isUpdated) { 267 return true; 268 } 269 quotaCache.triggerCacheRefresh(); 270 return false; 271 } 272 273 @Override 274 public String explainFailure() throws Exception { 275 return "Quota cache is still not refreshed"; 276 } 277 }); 278 279 LOG.debug("QuotaCache"); 280 LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache())); 281 LOG.debug(Objects.toString(quotaCache.getTableQuotaCache())); 282 LOG.debug(Objects.toString(quotaCache.getUserQuotaCache())); 283 LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache())); 284 } 285 } 286 287 static Set<QuotaCache> getQuotaCaches(HBaseTestingUtility testUtil) { 288 Set<QuotaCache> quotaCaches = new HashSet<>(); 289 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 290 RegionServerRpcQuotaManager quotaManager = 291 rst.getRegionServer().getRegionServerRpcQuotaManager(); 292 quotaCaches.add(quotaManager.getQuotaCache()); 293 } 294 return quotaCaches; 295 } 296 297 static void waitMinuteQuota() { 298 envEdge.incValue(70000); 299 } 300 301 static void clearQuotaCache(HBaseTestingUtility testUtil) { 302 for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) { 303 RegionServerRpcQuotaManager quotaManager = 304 rst.getRegionServer().getRegionServerRpcQuotaManager(); 305 QuotaCache quotaCache = quotaManager.getQuotaCache(); 306 quotaCache.getNamespaceQuotaCache().clear(); 307 quotaCache.getTableQuotaCache().clear(); 308 quotaCache.getUserQuotaCache().clear(); 309 quotaCache.getRegionServerQuotaCache().clear(); 310 } 311 } 312}