001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.RegionTooBusyException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.RSRpcServices; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 056 057@Category({ MediumTests.class, ClientTests.class }) 058public class TestAsyncClientPauseForRpcThrottling { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class); 063 064 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 065 066 private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling"); 067 068 private static byte[] FAMILY = Bytes.toBytes("Family"); 069 070 private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); 071 072 private static AsyncConnection CONN; 073 private static final AtomicBoolean THROTTLE = new AtomicBoolean(false); 074 private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0); 075 private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); 076 private static final int RETRY_COUNT = 3; 077 private static final int MAX_MULTIPLIER_EXPECTATION = 2; 078 079 public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices { 080 081 public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException { 082 super(rs); 083 } 084 085 @Override 086 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 087 throws ServiceException { 088 maybeForceRetry(); 089 maybeThrottle(); 090 return super.get(controller, request); 091 } 092 093 @Override 094 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 095 throws ServiceException { 096 maybeForceRetry(); 097 maybeThrottle(); 098 return super.multi(rpcc, request); 099 } 100 101 @Override 102 public ClientProtos.ScanResponse scan(RpcController controller, 103 ClientProtos.ScanRequest request) throws ServiceException { 104 maybeForceRetry(); 105 maybeThrottle(); 106 return super.scan(controller, request); 107 } 108 109 private void maybeForceRetry() throws ServiceException { 110 if (FORCE_RETRIES.get() > 0) { 111 FORCE_RETRIES.addAndGet(-1); 112 throw new ServiceException(new RegionTooBusyException("Retry")); 113 } 114 } 115 116 private void maybeThrottle() throws ServiceException { 117 if (THROTTLE.get()) { 118 THROTTLE.set(false); 119 throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait " 120 + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms")); 121 } 122 } 123 } 124 125 public static final class ThrottlingRegionServerForTest extends HRegionServer { 126 127 public ThrottlingRegionServerForTest(Configuration conf) throws IOException { 128 super(conf); 129 } 130 131 @Override 132 protected RSRpcServices createRpcServices() throws IOException { 133 return new ThrottlingRSRpcServicesForTest(this); 134 } 135 } 136 137 @BeforeClass 138 public static void setUp() throws Exception { 139 assertTrue( 140 "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] " 141 + "in order for our tests to adequately verify that we aren't " 142 + "multiplying throttled pauses based on the retry count.", 143 MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]); 144 145 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 146 UTIL.startMiniCluster(1); 147 UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, 148 ThrottlingRegionServerForTest.class, HRegionServer.class); 149 HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer(); 150 151 try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { 152 UTIL.waitTableAvailable(TABLE_NAME); 153 for (int i = 0; i < 100; i++) { 154 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); 155 } 156 } 157 158 UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(), 159 regionServer.getServerName()); 160 Configuration conf = new Configuration(UTIL.getConfiguration()); 161 CONN = ConnectionFactory.createAsyncConnection(conf).get(); 162 } 163 164 @AfterClass 165 public static void tearDown() throws Exception { 166 UTIL.getAdmin().disableTable(TABLE_NAME); 167 UTIL.getAdmin().deleteTable(TABLE_NAME); 168 Closeables.close(CONN, true); 169 UTIL.shutdownMiniCluster(); 170 } 171 172 private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception { 173 long costNs = getCostNs(callable); 174 if (isGreater) { 175 assertTrue(costNs > time); 176 } else { 177 assertTrue(costNs <= time); 178 } 179 } 180 181 private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception { 182 long costNs = getCostNs(callable); 183 assertTrue(costNs > minNs); 184 assertTrue(costNs < maxNs); 185 } 186 187 private long getCostNs(Callable<Void> callable) throws Exception { 188 long startNs = System.nanoTime(); 189 callable.call(); 190 return System.nanoTime() - startNs; 191 } 192 193 @Test 194 public void itWaitsForThrottledGet() throws Exception { 195 boolean isThrottled = true; 196 THROTTLE.set(isThrottled); 197 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 198 assertTime(() -> { 199 table.get(new Get(Bytes.toBytes(0))).get(); 200 return null; 201 }, WAIT_INTERVAL_NANOS, isThrottled); 202 } 203 204 @Test 205 public void itDoesNotWaitForUnthrottledGet() throws Exception { 206 boolean isThrottled = false; 207 THROTTLE.set(isThrottled); 208 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 209 assertTime(() -> { 210 table.get(new Get(Bytes.toBytes(0))).get(); 211 return null; 212 }, WAIT_INTERVAL_NANOS, isThrottled); 213 } 214 215 @Test 216 public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception { 217 AsyncTable<AdvancedScanResultConsumer> table = 218 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build(); 219 boolean isThrottled = true; 220 THROTTLE.set(isThrottled); 221 assertTime(() -> { 222 assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get()); 223 return null; 224 }, WAIT_INTERVAL_NANOS, false); 225 } 226 227 @Test 228 public void itDoesNotMultiplyThrottledGetWait() throws Exception { 229 THROTTLE.set(true); 230 FORCE_RETRIES.set(RETRY_COUNT); 231 232 AsyncTable<AdvancedScanResultConsumer> table = 233 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 234 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); 235 236 assertTimeBetween(() -> { 237 table.get(new Get(Bytes.toBytes(0))).get(); 238 return null; 239 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 240 } 241 242 @Test 243 public void itWaitsForThrottledBatch() throws Exception { 244 boolean isThrottled = true; 245 THROTTLE.set(isThrottled); 246 assertTime(() -> { 247 List<CompletableFuture<?>> futures = new ArrayList<>(); 248 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 249 for (int i = 100; i < 110; i++) { 250 futures.add(mutator 251 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 252 } 253 } 254 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 255 }, WAIT_INTERVAL_NANOS, isThrottled); 256 } 257 258 @Test 259 public void itDoesNotWaitForUnthrottledBatch() throws Exception { 260 boolean isThrottled = false; 261 THROTTLE.set(isThrottled); 262 assertTime(() -> { 263 List<CompletableFuture<?>> futures = new ArrayList<>(); 264 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 265 for (int i = 100; i < 110; i++) { 266 futures.add(mutator 267 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 268 } 269 } 270 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 271 }, WAIT_INTERVAL_NANOS, isThrottled); 272 } 273 274 @Test 275 public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception { 276 boolean isThrottled = true; 277 THROTTLE.set(isThrottled); 278 assertTime(() -> { 279 List<CompletableFuture<?>> futures = new ArrayList<>(); 280 try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) 281 .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) { 282 for (int i = 100; i < 110; i++) { 283 futures.add(mutator 284 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 285 } 286 } 287 assertThrows(ExecutionException.class, 288 () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get()); 289 return null; 290 }, WAIT_INTERVAL_NANOS, false); 291 } 292 293 @Test 294 public void itDoesNotMultiplyThrottledBatchWait() throws Exception { 295 THROTTLE.set(true); 296 FORCE_RETRIES.set(RETRY_COUNT); 297 298 assertTimeBetween(() -> { 299 List<CompletableFuture<?>> futures = new ArrayList<>(); 300 try (AsyncBufferedMutator mutator = 301 CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 302 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) { 303 for (int i = 100; i < 110; i++) { 304 futures.add(mutator 305 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 306 } 307 } 308 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 309 return null; 310 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 311 } 312 313 @Test 314 public void itWaitsForThrottledScan() throws Exception { 315 boolean isThrottled = true; 316 THROTTLE.set(isThrottled); 317 assertTime(() -> { 318 try ( 319 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 320 for (int i = 0; i < 100; i++) { 321 Result result = scanner.next(); 322 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 323 } 324 } 325 return null; 326 }, WAIT_INTERVAL_NANOS, isThrottled); 327 } 328 329 @Test 330 public void itDoesNotWaitForUnthrottledScan() throws Exception { 331 boolean isThrottled = false; 332 THROTTLE.set(isThrottled); 333 assertTime(() -> { 334 try ( 335 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 336 for (int i = 0; i < 100; i++) { 337 Result result = scanner.next(); 338 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 339 } 340 } 341 return null; 342 }, WAIT_INTERVAL_NANOS, isThrottled); 343 } 344 345 @Test 346 public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception { 347 AsyncTable<AdvancedScanResultConsumer> table = 348 CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build(); 349 boolean isThrottled = true; 350 THROTTLE.set(isThrottled); 351 assertTime(() -> { 352 try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { 353 for (int i = 0; i < 100; i++) { 354 assertThrows(RetriesExhaustedException.class, scanner::next); 355 } 356 } 357 return null; 358 }, WAIT_INTERVAL_NANOS, false); 359 } 360 361 @Test 362 public void itDoesNotMultiplyThrottledScanWait() throws Exception { 363 THROTTLE.set(true); 364 FORCE_RETRIES.set(RETRY_COUNT); 365 366 AsyncTable<AdvancedScanResultConsumer> table = 367 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 368 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); 369 370 assertTimeBetween(() -> { 371 try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { 372 for (int i = 0; i < 100; i++) { 373 Result result = scanner.next(); 374 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 375 } 376 } 377 return null; 378 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 379 } 380}