001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Map.Entry; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeSet; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.CallDroppedException; 052import org.apache.hadoop.hbase.CallQueueTooBigException; 053import org.apache.hadoop.hbase.Cell; 054import org.apache.hadoop.hbase.HBaseClassTestRule; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HBaseServerException; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionInfo; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.RegionLocations; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; 064import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 065import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 066import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 067import org.apache.hadoop.hbase.client.coprocessor.Batch; 068import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 069import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 070import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 071import org.apache.hadoop.hbase.security.User; 072import org.apache.hadoop.hbase.testclassification.ClientTests; 073import org.apache.hadoop.hbase.testclassification.LargeTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.Threads; 077import org.junit.Assert; 078import org.junit.Before; 079import org.junit.ClassRule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.mockito.Mockito; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 087 088@Category({ ClientTests.class, LargeTests.class }) 089public class TestAsyncProcess { 090 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestAsyncProcess.class); 094 095 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class); 096 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 097 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 098 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 099 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 100 private static final byte[] FAILS = Bytes.toBytes("FAILS"); 101 private Configuration CONF; 102 private ConnectionConfiguration CONNECTION_CONFIG; 103 private static final ServerName sn = ServerName.valueOf("s1,1,1"); 104 private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); 105 private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); 106 private static final HRegionInfo hri1 = 107 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 108 private static final HRegionInfo hri2 = 109 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 110 private static final HRegionInfo hri3 = 111 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 112 private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); 113 private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); 114 private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); 115 116 // Replica stuff 117 private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); 118 private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); 119 private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); 120 private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), 121 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); 122 private static final RegionLocations hrls2 = 123 new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3)); 124 private static final RegionLocations hrls3 = 125 new RegionLocations(new HRegionLocation(hri3, sn3), null); 126 127 private static final String success = "success"; 128 private static Exception failure = new Exception("failure"); 129 130 private static final int NB_RETRIES = 3; 131 132 private int RPC_TIMEOUT; 133 private int OPERATION_TIMEOUT; 134 135 @Before 136 public void beforeEach() { 137 this.CONF = new Configuration(); 138 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); 139 this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); 140 this.RPC_TIMEOUT = 141 CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 142 this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 143 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 144 } 145 146 static class CountingThreadFactory implements ThreadFactory { 147 final AtomicInteger nbThreads; 148 ThreadFactory realFactory = 149 new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d") 150 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(); 151 152 @Override 153 public Thread newThread(Runnable r) { 154 nbThreads.incrementAndGet(); 155 return realFactory.newThread(r); 156 } 157 158 CountingThreadFactory(AtomicInteger nbThreads) { 159 this.nbThreads = nbThreads; 160 } 161 } 162 163 static class MyAsyncProcess extends AsyncProcess { 164 final AtomicInteger nbMultiResponse = new AtomicInteger(); 165 final AtomicInteger nbActions = new AtomicInteger(); 166 public List<AsyncRequestFuture> allReqs = new ArrayList<>(); 167 public AtomicInteger callsCt = new AtomicInteger(); 168 private Configuration conf; 169 170 private long previousTimeout = -1; 171 final ExecutorService service; 172 173 @Override 174 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(AsyncProcessTask task, 175 List<Action> actions, long nonceGroup) { 176 // Test HTable has tableName of null, so pass DUMMY_TABLE 177 AsyncProcessTask wrap = new AsyncProcessTask(task) { 178 @Override 179 public TableName getTableName() { 180 return DUMMY_TABLE; 181 } 182 }; 183 AsyncRequestFutureImpl<Res> r = 184 new MyAsyncRequestFutureImpl<>(wrap, actions, nonceGroup, this); 185 allReqs.add(r); 186 return r; 187 } 188 189 public MyAsyncProcess(ClusterConnection hc, Configuration conf) { 190 super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), 191 new RpcControllerFactory(conf)); 192 service = Executors.newFixedThreadPool(5); 193 this.conf = conf; 194 } 195 196 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { 197 super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), 198 new RpcControllerFactory(conf)); 199 service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), 200 new CountingThreadFactory(nbThreads)); 201 } 202 203 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, 204 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 205 boolean needResults) throws InterruptedIOException { 206 AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) 207 .setPool(pool == null ? service : pool).setTableName(tableName).setRowAccess(rows) 208 .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) 209 .setNeedResults(needResults) 210 .setRpcTimeout( 211 conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) 212 .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 213 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) 214 .build(); 215 return submit(task); 216 } 217 218 public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows, 219 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) 220 throws InterruptedIOException { 221 return submit(null, tableName, rows, atLeastOne, callback, needResults); 222 } 223 224 @Override 225 public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) 226 throws InterruptedIOException { 227 previousTimeout = task.getRpcTimeout(); 228 // We use results in tests to check things, so override to always save them. 229 AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { 230 @Override 231 public boolean getNeedResults() { 232 return true; 233 } 234 }; 235 return super.submit(wrap); 236 } 237 238 @Override 239 protected RpcRetryingCaller<AbstractResponse> 240 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 241 callsCt.incrementAndGet(); 242 MultiServerCallable callable1 = (MultiServerCallable) callable; 243 final MultiResponse mr = createMultiResponse(callable1.getMulti(), nbMultiResponse, nbActions, 244 new ResponseGenerator() { 245 @Override 246 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 247 if (Arrays.equals(FAILS, a.getAction().getRow())) { 248 mr.add(regionName, a.getOriginalIndex(), failure); 249 } else { 250 mr.add(regionName, a.getOriginalIndex(), success); 251 } 252 } 253 }); 254 255 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 256 RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { 257 @Override 258 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 259 int callTimeout) throws IOException, RuntimeException { 260 try { 261 // sleep one second in order for threadpool to start another thread instead of reusing 262 // existing one. 263 Thread.sleep(1000); 264 } catch (InterruptedException e) { 265 // ignore error 266 } 267 return mr; 268 } 269 }; 270 } 271 272 } 273 274 static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { 275 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); 276 277 public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, 278 AsyncProcess asyncProcess) { 279 super(task, actions, nonceGroup, asyncProcess); 280 } 281 282 @Override 283 protected void updateStats(ServerName server, MultiResponse resp) { 284 // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. 285 } 286 287 Map<ServerName, List<Long>> getRequestHeapSize() { 288 return heapSizesByServer; 289 } 290 291 @Override 292 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, 293 ServerName server, Set<CancellableRegionServerCallable> callsInProgress) { 294 SingleServerRequestRunnable rq = 295 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 296 List<Long> heapCount = heapSizesByServer.get(server); 297 if (heapCount == null) { 298 heapCount = new ArrayList<>(); 299 heapSizesByServer.put(server, heapCount); 300 } 301 heapCount.add(heapSizeOf(multiAction)); 302 return rq; 303 } 304 305 private long heapSizeOf(MultiAction multiAction) { 306 return multiAction.actions.values().stream().flatMap(v -> v.stream()) 307 .map(action -> action.getAction()).filter(row -> row instanceof Mutation) 308 .mapToLong(row -> ((Mutation) row).heapSize()).sum(); 309 } 310 } 311 312 static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> { 313 314 private final IOException e; 315 316 public CallerWithFailure(IOException e) { 317 super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null); 318 this.e = e; 319 } 320 321 @Override 322 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 323 int callTimeout) throws IOException, RuntimeException { 324 throw e; 325 } 326 } 327 328 /** 329 * Used to simulate the case where a RegionServer responds to a multi request, but some or all of 330 * the actions have an Exception instead of Result. These responses go through receiveMultiAction, 331 * which has handling for individual action failures. 332 */ 333 static class CallerWithRegionException extends RpcRetryingCallerImpl<AbstractResponse> { 334 335 private final IOException e; 336 private MultiAction multi; 337 338 public CallerWithRegionException(IOException e, MultiAction multi) { 339 super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null); 340 this.e = e; 341 this.multi = multi; 342 } 343 344 @Override 345 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 346 int callTimeout) throws IOException, RuntimeException { 347 MultiResponse response = new MultiResponse(); 348 for (Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 349 response.addException(entry.getKey(), e); 350 } 351 return response; 352 } 353 } 354 355 static class AsyncProcessWithFailure extends MyAsyncProcess { 356 357 private final IOException ioe; 358 private final ServerName failingServer; 359 private final boolean returnAsRegionException; 360 361 public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe, 362 ServerName failingServer, boolean returnAsRegionException) { 363 super(hc, myConf); 364 this.ioe = ioe; 365 this.failingServer = failingServer; 366 this.returnAsRegionException = returnAsRegionException; 367 serverTrackerTimeout = 1L; 368 } 369 370 public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe) { 371 this(hc, myConf, ioe, null, false); 372 } 373 374 @Override 375 protected RpcRetryingCaller<AbstractResponse> 376 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 377 MultiServerCallable msc = (MultiServerCallable) callable; 378 if (failingServer != null) { 379 if (!msc.getServerName().equals(failingServer)) { 380 return super.createCaller(callable, rpcTimeout); 381 } 382 } 383 384 if (returnAsRegionException) { 385 return new CallerWithRegionException(ioe, msc.getMulti()); 386 } 387 388 callsCt.incrementAndGet(); 389 return new CallerWithFailure(ioe); 390 } 391 } 392 393 /** 394 * Make the backoff time always different on each call. 395 */ 396 static class MyClientBackoffPolicy implements ClientBackoffPolicy { 397 private final Map<ServerName, AtomicInteger> count = new HashMap<>(); 398 399 @Override 400 public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { 401 AtomicInteger inc = count.get(serverName); 402 if (inc == null) { 403 inc = new AtomicInteger(0); 404 count.put(serverName, inc); 405 } 406 return inc.getAndIncrement(); 407 } 408 } 409 410 static class MyAsyncProcessWithReplicas extends MyAsyncProcess { 411 private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator()); 412 private long primarySleepMs = 0, replicaSleepMs = 0; 413 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>(); 414 private final AtomicLong replicaCalls = new AtomicLong(0); 415 416 public void addFailures(RegionInfo... hris) { 417 for (RegionInfo hri : hris) { 418 failures.add(hri.getRegionName()); 419 } 420 } 421 422 public long getReplicaCallCount() { 423 return replicaCalls.get(); 424 } 425 426 public void setPrimaryCallDelay(ServerName server, long primaryMs) { 427 customPrimarySleepMs.put(server, primaryMs); 428 } 429 430 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { 431 super(hc, conf); 432 } 433 434 public void setCallDelays(long primaryMs, long replicaMs) { 435 this.primarySleepMs = primaryMs; 436 this.replicaSleepMs = replicaMs; 437 } 438 439 @Override 440 protected RpcRetryingCaller<AbstractResponse> 441 createCaller(CancellableRegionServerCallable payloadCallable, int rpcTimeout) { 442 MultiServerCallable callable = (MultiServerCallable) payloadCallable; 443 final MultiResponse mr = createMultiResponse(callable.getMulti(), nbMultiResponse, nbActions, 444 new ResponseGenerator() { 445 @Override 446 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 447 if (failures.contains(regionName)) { 448 mr.add(regionName, a.getOriginalIndex(), failure); 449 } else { 450 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); 451 mr.add(regionName, a.getOriginalIndex(), Result.create(new Cell[0], null, isStale)); 452 } 453 } 454 }); 455 // Currently AsyncProcess either sends all-replica, or all-primary request. 456 final boolean isDefault = RegionReplicaUtil.isDefaultReplica( 457 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); 458 final ServerName server = ((MultiServerCallable) callable).getServerName(); 459 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " 460 + callable.getMulti().actions.size() + " entries: "; 461 for (byte[] region : callable.getMulti().actions.keySet()) { 462 debugMsg += "[" + Bytes.toStringBinary(region) + "], "; 463 } 464 LOG.debug(debugMsg); 465 if (!isDefault) { 466 replicaCalls.incrementAndGet(); 467 } 468 469 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 470 RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { 471 @Override 472 public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 473 int callTimeout) throws IOException, RuntimeException { 474 long sleep = -1; 475 if (isDefault) { 476 Long customSleep = customPrimarySleepMs.get(server); 477 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); 478 } else { 479 sleep = replicaSleepMs; 480 } 481 if (sleep != 0) { 482 try { 483 Thread.sleep(sleep); 484 } catch (InterruptedException e) { 485 // Restore interrupt status 486 Thread.currentThread().interrupt(); 487 } 488 } 489 return mr; 490 } 491 }; 492 } 493 } 494 495 static MultiResponse createMultiResponse(final MultiAction multi, AtomicInteger nbMultiResponse, 496 AtomicInteger nbActions, ResponseGenerator gen) { 497 final MultiResponse mr = new MultiResponse(); 498 nbMultiResponse.incrementAndGet(); 499 for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 500 byte[] regionName = entry.getKey(); 501 for (Action a : entry.getValue()) { 502 nbActions.incrementAndGet(); 503 gen.addResponse(mr, regionName, a); 504 } 505 } 506 return mr; 507 } 508 509 private static interface ResponseGenerator { 510 void addResponse(final MultiResponse mr, byte[] regionName, Action a); 511 } 512 513 /** 514 * Returns our async process. 515 */ 516 static class MyConnectionImpl extends ConnectionImplementation { 517 public static class TestRegistry extends DoNothingConnectionRegistry { 518 519 public TestRegistry(Configuration conf, User user) { 520 super(conf, user); 521 } 522 523 @Override 524 public CompletableFuture<String> getClusterId() { 525 return CompletableFuture.completedFuture("testClusterId"); 526 } 527 } 528 529 final AtomicInteger nbThreads = new AtomicInteger(0); 530 531 protected MyConnectionImpl(Configuration conf) throws IOException { 532 super(setupConf(conf), null, null, Collections.emptyMap()); 533 } 534 535 private static Configuration setupConf(Configuration conf) { 536 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, TestRegistry.class, 537 ConnectionRegistry.class); 538 return conf; 539 } 540 541 @Override 542 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 543 boolean retry, int replicaId) throws IOException { 544 return new RegionLocations(loc1); 545 } 546 547 @Override 548 public boolean hasCellBlockSupport() { 549 return false; 550 } 551 } 552 553 /** 554 * Returns our async process. 555 */ 556 static class MyConnectionImpl2 extends MyConnectionImpl { 557 List<HRegionLocation> hrl; 558 final boolean usedRegions[]; 559 560 protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException { 561 super(conf); 562 this.hrl = hrl; 563 this.usedRegions = new boolean[hrl.size()]; 564 } 565 566 @Override 567 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 568 boolean retry, int replicaId) throws IOException { 569 int i = 0; 570 for (HRegionLocation hr : hrl) { 571 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { 572 usedRegions[i] = true; 573 return new RegionLocations(hr); 574 } 575 i++; 576 } 577 return null; 578 } 579 } 580 581 @Test 582 public void testListRowAccess() { 583 int count = 10; 584 List<String> values = new ArrayList<>(); 585 for (int i = 0; i != count; ++i) { 586 values.add(String.valueOf(i)); 587 } 588 589 ListRowAccess<String> taker = new ListRowAccess<>(values); 590 assertEquals(count, taker.size()); 591 592 int takeCount = 0; 593 Iterator<String> it = taker.iterator(); 594 while (it.hasNext()) { 595 String v = it.next(); 596 assertEquals(String.valueOf(takeCount), v); 597 ++takeCount; 598 it.remove(); 599 if (Math.random() >= 0.5) { 600 break; 601 } 602 } 603 assertEquals(count, taker.size() + takeCount); 604 605 it = taker.iterator(); 606 while (it.hasNext()) { 607 String v = it.next(); 608 assertEquals(String.valueOf(takeCount), v); 609 ++takeCount; 610 it.remove(); 611 } 612 assertEquals(0, taker.size()); 613 assertEquals(count, takeCount); 614 } 615 616 private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { 617 if (putSizePerServer <= maxHeapSizePerRequest) { 618 return 1; 619 } else if (putSizePerServer % maxHeapSizePerRequest == 0) { 620 return putSizePerServer / maxHeapSizePerRequest; 621 } else { 622 return putSizePerServer / maxHeapSizePerRequest + 1; 623 } 624 } 625 626 @Test 627 public void testSubmitSameSizeOfRequest() throws Exception { 628 long writeBuffer = 2 * 1024 * 1024; 629 long putsHeapSize = writeBuffer; 630 doSubmitRequest(writeBuffer, putsHeapSize); 631 } 632 633 @Test 634 public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { 635 long maxHeapSizePerRequest = Long.MAX_VALUE; 636 long putsHeapSize = 2 * 1024 * 1024; 637 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 638 } 639 640 @Test 641 public void testSubmitRandomSizeRequest() throws Exception { 642 Random rn = new Random(); 643 final long limit = 10 * 1024 * 1024; 644 final int requestCount = 1 + (int) (rn.nextDouble() * 3); 645 long n = rn.nextLong(); 646 if (n < 0) { 647 n = -n; 648 } else if (n == 0) { 649 n = 1; 650 } 651 long putsHeapSize = n % limit; 652 long maxHeapSizePerRequest = putsHeapSize / requestCount; 653 LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest 654 + ", putsHeapSize=" + putsHeapSize); 655 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 656 } 657 658 @Test 659 public void testSubmitSmallRequest() throws Exception { 660 long maxHeapSizePerRequest = 2 * 1024 * 1024; 661 long putsHeapSize = 100; 662 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 663 } 664 665 @Test 666 public void testSubmitLargeRequest() throws Exception { 667 long maxHeapSizePerRequest = 2 * 1024 * 1024; 668 long putsHeapSize = maxHeapSizePerRequest * 2; 669 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 670 } 671 672 private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { 673 ClusterConnection conn = createHConnection(); 674 final String defaultClazz = 675 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 676 final long defaultHeapSizePerRequest = 677 conn.getConfiguration().getLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 678 SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 679 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 680 SimpleRequestController.class.getName()); 681 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 682 maxHeapSizePerRequest); 683 684 // sn has two regions 685 long putSizeSN = 0; 686 long putSizeSN2 = 0; 687 List<Put> puts = new ArrayList<>(); 688 while ((putSizeSN + putSizeSN2) <= putsHeapSize) { 689 Put put1 = new Put(DUMMY_BYTES_1); 690 put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 691 Put put2 = new Put(DUMMY_BYTES_2); 692 put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 693 Put put3 = new Put(DUMMY_BYTES_3); 694 put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); 695 putSizeSN += (put1.heapSize() + put2.heapSize()); 696 putSizeSN2 += put3.heapSize(); 697 puts.add(put1); 698 puts.add(put2); 699 puts.add(put3); 700 } 701 702 int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); 703 int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); 704 LOG.info("Total put count:" + puts.size() + ", putSizeSN:" + putSizeSN + ", putSizeSN2:" 705 + putSizeSN2 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" 706 + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); 707 708 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 709 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 710 try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { 711 mutator.mutate(puts); 712 mutator.flush(); 713 List<AsyncRequestFuture> reqs = ap.allReqs; 714 715 int actualSnReqCount = 0; 716 int actualSn2ReqCount = 0; 717 for (AsyncRequestFuture req : reqs) { 718 if (!(req instanceof AsyncRequestFutureImpl)) { 719 continue; 720 } 721 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 722 if (ars.getRequestHeapSize().containsKey(sn)) { 723 ++actualSnReqCount; 724 } 725 if (ars.getRequestHeapSize().containsKey(sn2)) { 726 ++actualSn2ReqCount; 727 } 728 } 729 // If the server is busy, the actual count may be incremented. 730 assertEquals(true, minCountSnRequest <= actualSnReqCount); 731 assertEquals(true, minCountSn2Request <= actualSn2ReqCount); 732 Map<ServerName, Long> sizePerServers = new HashMap<>(); 733 for (AsyncRequestFuture req : reqs) { 734 if (!(req instanceof AsyncRequestFutureImpl)) { 735 continue; 736 } 737 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 738 Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); 739 for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { 740 long sum = 0; 741 for (long size : entry.getValue()) { 742 assertEquals(true, size <= maxHeapSizePerRequest); 743 sum += size; 744 } 745 assertEquals(true, sum <= maxHeapSizePerRequest); 746 long value = sizePerServers.getOrDefault(entry.getKey(), 0L); 747 sizePerServers.put(entry.getKey(), value + sum); 748 } 749 } 750 assertEquals(true, sizePerServers.containsKey(sn)); 751 assertEquals(true, sizePerServers.containsKey(sn2)); 752 assertEquals(false, sizePerServers.containsKey(sn3)); 753 assertEquals(putSizeSN, (long) sizePerServers.get(sn)); 754 assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); 755 } 756 // restore config. 757 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 758 defaultHeapSizePerRequest); 759 if (defaultClazz != null) { 760 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 761 defaultClazz); 762 } 763 } 764 765 @Test 766 public void testSubmit() throws Exception { 767 ClusterConnection hc = createHConnection(); 768 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 769 770 List<Put> puts = new ArrayList<>(1); 771 puts.add(createPut(1, true)); 772 773 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 774 Assert.assertTrue(puts.isEmpty()); 775 } 776 777 @Test 778 public void testSubmitWithCB() throws Exception { 779 ClusterConnection hc = createHConnection(); 780 final AtomicInteger updateCalled = new AtomicInteger(0); 781 Batch.Callback<Object> cb = new Batch.Callback<Object>() { 782 @Override 783 public void update(byte[] region, byte[] row, Object result) { 784 updateCalled.incrementAndGet(); 785 } 786 }; 787 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 788 789 List<Put> puts = new ArrayList<>(1); 790 puts.add(createPut(1, true)); 791 792 final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); 793 Assert.assertTrue(puts.isEmpty()); 794 ars.waitUntilDone(); 795 Assert.assertEquals(1, updateCalled.get()); 796 } 797 798 @Test 799 public void testSubmitBusyRegion() throws Exception { 800 ClusterConnection conn = createHConnection(); 801 final String defaultClazz = 802 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 803 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 804 SimpleRequestController.class.getName()); 805 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 806 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 807 List<Put> puts = new ArrayList<>(1); 808 puts.add(createPut(1, true)); 809 810 for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { 811 ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 812 } 813 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 814 Assert.assertEquals(puts.size(), 1); 815 816 ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 817 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 818 Assert.assertEquals(0, puts.size()); 819 if (defaultClazz != null) { 820 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 821 defaultClazz); 822 } 823 } 824 825 @Test 826 public void testSubmitBusyRegionServer() throws Exception { 827 ClusterConnection conn = createHConnection(); 828 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 829 final String defaultClazz = 830 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 831 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 832 SimpleRequestController.class.getName()); 833 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 834 controller.taskCounterPerServer.put(sn2, 835 new AtomicInteger(controller.maxConcurrentTasksPerServer)); 836 837 List<Put> puts = new ArrayList<>(4); 838 puts.add(createPut(1, true)); 839 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy 840 puts.add(createPut(1, true)); // <== this one will make it, the region is already in 841 puts.add(createPut(2, true)); // <== new region, but the rs is ok 842 843 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 844 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 845 846 controller.taskCounterPerServer.put(sn2, 847 new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); 848 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 849 Assert.assertTrue(puts.isEmpty()); 850 if (defaultClazz != null) { 851 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 852 defaultClazz); 853 } 854 } 855 856 @Test 857 public void testFail() throws Exception { 858 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 859 860 List<Put> puts = new ArrayList<>(1); 861 Put p = createPut(1, false); 862 puts.add(p); 863 864 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 865 Assert.assertEquals(0, puts.size()); 866 ars.waitUntilDone(); 867 verifyResult(ars, false); 868 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 869 870 Assert.assertEquals(1, ars.getErrors().exceptions.size()); 871 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 872 failure.equals(ars.getErrors().exceptions.get(0))); 873 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 874 failure.equals(ars.getErrors().exceptions.get(0))); 875 876 Assert.assertEquals(1, ars.getFailedOperations().size()); 877 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), 878 p.equals(ars.getFailedOperations().get(0))); 879 } 880 881 @Test 882 public void testSubmitTrue() throws IOException { 883 ClusterConnection conn = createHConnection(); 884 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 885 final String defaultClazz = 886 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 887 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 888 SimpleRequestController.class.getName()); 889 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 890 controller.tasksInProgress.incrementAndGet(); 891 final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); 892 controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); 893 894 final AtomicBoolean checkPoint = new AtomicBoolean(false); 895 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); 896 897 Thread t = new Thread() { 898 @Override 899 public void run() { 900 Threads.sleep(1000); 901 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent 902 ai.decrementAndGet(); 903 controller.tasksInProgress.decrementAndGet(); 904 checkPoint2.set(true); 905 } 906 }; 907 908 List<Put> puts = new ArrayList<>(1); 909 Put p = createPut(1, true); 910 puts.add(p); 911 912 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 913 Assert.assertFalse(puts.isEmpty()); 914 915 t.start(); 916 917 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 918 Assert.assertTrue(puts.isEmpty()); 919 920 checkPoint.set(true); 921 while (!checkPoint2.get()) { 922 Threads.sleep(1); 923 } 924 if (defaultClazz != null) { 925 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 926 defaultClazz); 927 } 928 } 929 930 @Test 931 public void testFailAndSuccess() throws Exception { 932 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 933 934 List<Put> puts = new ArrayList<>(3); 935 puts.add(createPut(1, false)); 936 puts.add(createPut(1, true)); 937 puts.add(createPut(1, true)); 938 939 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 940 Assert.assertTrue(puts.isEmpty()); 941 ars.waitUntilDone(); 942 verifyResult(ars, false, true, true); 943 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 944 ap.callsCt.set(0); 945 Assert.assertEquals(1, ars.getErrors().actions.size()); 946 947 puts.add(createPut(1, true)); 948 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. 949 ap.waitForMaximumCurrentTasks(0, null); 950 ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 951 Assert.assertEquals(0, puts.size()); 952 ars.waitUntilDone(); 953 Assert.assertEquals(1, ap.callsCt.get()); 954 verifyResult(ars, true); 955 } 956 957 @Test 958 public void testFlush() throws Exception { 959 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 960 961 List<Put> puts = new ArrayList<>(3); 962 puts.add(createPut(1, false)); 963 puts.add(createPut(1, true)); 964 puts.add(createPut(1, true)); 965 966 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 967 ars.waitUntilDone(); 968 verifyResult(ars, false, true, true); 969 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 970 971 Assert.assertEquals(1, ars.getFailedOperations().size()); 972 } 973 974 @Test 975 public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { 976 ClusterConnection hc = createHConnection(); 977 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 978 testTaskCount(ap); 979 } 980 981 @Test 982 public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { 983 Configuration copyConf = new Configuration(CONF); 984 copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 985 MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); 986 ClusterConnection conn = createHConnection(); 987 Mockito.when(conn.getConfiguration()).thenReturn(copyConf); 988 Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); 989 Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); 990 final String defaultClazz = 991 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 992 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 993 SimpleRequestController.class.getName()); 994 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 995 testTaskCount(ap); 996 if (defaultClazz != null) { 997 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 998 defaultClazz); 999 } 1000 } 1001 1002 private void testTaskCount(MyAsyncProcess ap) 1003 throws InterruptedIOException, InterruptedException { 1004 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 1005 List<Put> puts = new ArrayList<>(); 1006 for (int i = 0; i != 3; ++i) { 1007 puts.add(createPut(1, true)); 1008 puts.add(createPut(2, true)); 1009 puts.add(createPut(3, true)); 1010 } 1011 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 1012 ap.waitForMaximumCurrentTasks(0, null); 1013 // More time to wait if there are incorrect task count. 1014 TimeUnit.SECONDS.sleep(1); 1015 assertEquals(0, controller.tasksInProgress.get()); 1016 for (AtomicInteger count : controller.taskCounterPerRegion.values()) { 1017 assertEquals(0, count.get()); 1018 } 1019 for (AtomicInteger count : controller.taskCounterPerServer.values()) { 1020 assertEquals(0, count.get()); 1021 } 1022 } 1023 1024 @Test 1025 public void testMaxTask() throws Exception { 1026 ClusterConnection conn = createHConnection(); 1027 final String defaultClazz = 1028 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 1029 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1030 SimpleRequestController.class.getName()); 1031 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1032 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 1033 1034 for (int i = 0; i < 1000; i++) { 1035 ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1036 } 1037 1038 final Thread myThread = Thread.currentThread(); 1039 1040 Thread t = new Thread() { 1041 @Override 1042 public void run() { 1043 Threads.sleep(2000); 1044 myThread.interrupt(); 1045 } 1046 }; 1047 1048 List<Put> puts = new ArrayList<>(1); 1049 puts.add(createPut(1, true)); 1050 1051 t.start(); 1052 1053 try { 1054 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 1055 Assert.fail("We should have been interrupted."); 1056 } catch (InterruptedIOException expected) { 1057 } 1058 1059 final long sleepTime = 2000; 1060 1061 Thread t2 = new Thread() { 1062 @Override 1063 public void run() { 1064 Threads.sleep(sleepTime); 1065 while (controller.tasksInProgress.get() > 0) { 1066 ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1067 } 1068 } 1069 }; 1070 t2.start(); 1071 1072 long start = EnvironmentEdgeManager.currentTime(); 1073 ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false); 1074 long end = EnvironmentEdgeManager.currentTime(); 1075 1076 // Adds 100 to secure us against approximate timing. 1077 Assert.assertTrue(start + 100L + sleepTime > end); 1078 if (defaultClazz != null) { 1079 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1080 defaultClazz); 1081 } 1082 } 1083 1084 private ClusterConnection createHConnection() throws IOException { 1085 return createHConnection(CONNECTION_CONFIG); 1086 } 1087 1088 private ClusterConnection createHConnection(ConnectionConfiguration configuration) 1089 throws IOException { 1090 ClusterConnection hc = createHConnectionCommon(configuration); 1091 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); 1092 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); 1093 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); 1094 Mockito 1095 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 1096 .thenReturn(Arrays.asList(loc1, loc2, loc3)); 1097 setMockLocation(hc, FAILS, new RegionLocations(loc2)); 1098 return hc; 1099 } 1100 1101 private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration) 1102 throws IOException { 1103 ClusterConnection hc = createHConnectionCommon(configuration); 1104 setMockLocation(hc, DUMMY_BYTES_1, hrls1); 1105 setMockLocation(hc, DUMMY_BYTES_2, hrls2); 1106 setMockLocation(hc, DUMMY_BYTES_3, hrls3); 1107 List<HRegionLocation> locations = new ArrayList<>(); 1108 for (HRegionLocation loc : hrls1.getRegionLocations()) { 1109 locations.add(loc); 1110 } 1111 for (HRegionLocation loc : hrls2.getRegionLocations()) { 1112 locations.add(loc); 1113 } 1114 for (HRegionLocation loc : hrls3.getRegionLocations()) { 1115 locations.add(loc); 1116 } 1117 Mockito 1118 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 1119 .thenReturn(locations); 1120 return hc; 1121 } 1122 1123 private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) 1124 throws IOException { 1125 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 1126 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 1127 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 1128 Mockito.anyBoolean())).thenReturn(result); 1129 } 1130 1131 private ClusterConnection 1132 createHConnectionCommon(ConnectionConfiguration connectionConfiguration) { 1133 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 1134 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 1135 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 1136 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 1137 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 1138 Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration); 1139 return hc; 1140 } 1141 1142 @Test 1143 public void testHTablePutSuccess() throws Exception { 1144 ClusterConnection conn = createHConnection(); 1145 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1146 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1147 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1148 1149 Put put = createPut(1, true); 1150 1151 Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), 1152 ht.getWriteBufferSize()); 1153 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1154 ht.mutate(put); 1155 ht.flush(); 1156 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1157 } 1158 1159 @Test 1160 public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { 1161 ClusterConnection conn = createHConnection(); 1162 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1163 1164 checkPeriodicFlushParameters(conn, ap, 1234, 1234, 1234, 1234); 1165 checkPeriodicFlushParameters(conn, ap, 0, 0, 0, 1166 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1167 checkPeriodicFlushParameters(conn, ap, -1234, 0, -1234, 1168 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1169 checkPeriodicFlushParameters(conn, ap, 1, 1, 1, 1170 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1171 } 1172 1173 private void checkPeriodicFlushParameters(ClusterConnection conn, MyAsyncProcess ap, long setTO, 1174 long expectTO, long setTT, long expectTT) { 1175 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1176 1177 // The BufferedMutatorParams does nothing with the value 1178 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); 1179 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); 1180 Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); 1181 Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); 1182 1183 // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) 1184 BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); 1185 Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); 1186 Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); 1187 1188 // The BufferedMutatorImpl corrects illegal values (direct via setter) 1189 BufferedMutatorImpl ht2 = 1190 new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); 1191 ht2.setWriteBufferPeriodicFlush(setTO, setTT); 1192 Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); 1193 Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); 1194 1195 } 1196 1197 @Test 1198 public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { 1199 ClusterConnection conn = createHConnection(); 1200 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1201 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1202 1203 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP 1204 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms 1205 bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record 1206 1207 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1208 1209 // Verify if BufferedMutator has the right settings. 1210 Assert.assertEquals(10000, ht.getWriteBufferSize()); 1211 Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); 1212 Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 1213 ht.getWriteBufferPeriodicFlushTimerTickMs()); 1214 1215 Put put = createPut(1, true); 1216 1217 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1218 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1219 1220 // ----- Insert, flush immediately, MUST NOT flush automatically 1221 ht.mutate(put); 1222 ht.flush(); 1223 1224 Thread.sleep(1000); 1225 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1226 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1227 1228 // ----- Insert, NO flush, MUST flush automatically 1229 ht.mutate(put); 1230 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1231 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1232 1233 // The timerTick should fire every 100ms, so after twice that we must have 1234 // seen at least 1 tick and we should see an automatic flush 1235 Thread.sleep(200); 1236 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1237 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1238 1239 // Ensure it does not flush twice 1240 Thread.sleep(200); 1241 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1242 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1243 1244 // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically 1245 ht.disableWriteBufferPeriodicFlush(); 1246 ht.mutate(put); 1247 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1248 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1249 1250 // Wait for at least 1 timerTick, we should see NO flushes. 1251 Thread.sleep(200); 1252 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1253 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1254 1255 // Reenable periodic flushing, a flush seems to take about 1 second 1256 // so we wait for 2 seconds and it should have finished the flush. 1257 ht.setWriteBufferPeriodicFlush(1, 100); 1258 Thread.sleep(2000); 1259 Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); 1260 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1261 } 1262 1263 @Test 1264 public void testBufferedMutatorImplWithSharedPool() throws Exception { 1265 ClusterConnection conn = createHConnection(); 1266 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1267 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1268 BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1269 1270 ht.close(); 1271 assertFalse(ap.service.isShutdown()); 1272 } 1273 1274 @Test 1275 public void testFailedPutAndNewPut() throws Exception { 1276 ClusterConnection conn = createHConnection(); 1277 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1278 BufferedMutatorParams bufferParam = 1279 createBufferedMutatorParams(ap, DUMMY_TABLE).writeBufferSize(0); 1280 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1281 1282 Put p = createPut(1, false); 1283 try { 1284 mutator.mutate(p); 1285 Assert.fail(); 1286 } catch (RetriesExhaustedWithDetailsException expected) { 1287 assertEquals(1, expected.getNumExceptions()); 1288 assertTrue(expected.getRow(0) == p); 1289 } 1290 // Let's do all the retries. 1291 ap.waitForMaximumCurrentTasks(0, null); 1292 Assert.assertEquals(0, mutator.size()); 1293 1294 // There is no global error so the new put should not fail 1295 mutator.mutate(createPut(1, true)); 1296 Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); 1297 } 1298 1299 @SuppressWarnings("SelfComparison") 1300 @Test 1301 public void testAction() { 1302 Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); 1303 Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1304 Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1305 Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); 1306 assertFalse(action_0.equals(action_1)); 1307 assertTrue(action_0.equals(action_0)); 1308 assertTrue(action_1.equals(action_2)); 1309 assertTrue(action_2.equals(action_1)); 1310 assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); 1311 assertTrue(action_2.equals(action_3)); 1312 assertFalse(action_0.equals(action_3)); 1313 assertEquals(0, action_0.compareTo(action_0)); 1314 assertTrue(action_0.compareTo(action_1) < 0); 1315 assertTrue(action_1.compareTo(action_0) > 0); 1316 assertEquals(0, action_1.compareTo(action_2)); 1317 } 1318 1319 @Test 1320 public void testBatch() throws IOException, InterruptedException { 1321 ClusterConnection conn = new MyConnectionImpl(CONF); 1322 HTable ht = (HTable) conn.getTable(DUMMY_TABLE); 1323 ht.multiAp = new MyAsyncProcess(conn, CONF); 1324 1325 List<Put> puts = new ArrayList<>(7); 1326 puts.add(createPut(1, true)); 1327 puts.add(createPut(1, true)); 1328 puts.add(createPut(1, true)); 1329 puts.add(createPut(1, true)); 1330 puts.add(createPut(1, false)); // <=== the bad apple, position 4 1331 puts.add(createPut(1, true)); 1332 puts.add(createPut(1, false)); // <=== another bad apple, position 6 1333 1334 Object[] res = new Object[puts.size()]; 1335 try { 1336 ht.batch(puts, res); 1337 Assert.fail(); 1338 } catch (RetriesExhaustedException expected) { 1339 } 1340 1341 Assert.assertEquals(success, res[0]); 1342 Assert.assertEquals(success, res[1]); 1343 Assert.assertEquals(success, res[2]); 1344 Assert.assertEquals(success, res[3]); 1345 Assert.assertEquals(failure, res[4]); 1346 Assert.assertEquals(success, res[5]); 1347 Assert.assertEquals(failure, res[6]); 1348 } 1349 1350 @Test 1351 public void testErrorsServers() throws IOException { 1352 Configuration configuration = new Configuration(CONF); 1353 ClusterConnection conn = new MyConnectionImpl(configuration); 1354 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); 1355 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1356 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1357 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); 1358 1359 Assert.assertNotNull(ap.createServerErrorTracker()); 1360 Assert.assertTrue(ap.serverTrackerTimeout > 200L); 1361 ap.serverTrackerTimeout = 1L; 1362 1363 Put p = createPut(1, false); 1364 mutator.mutate(p); 1365 1366 try { 1367 mutator.flush(); 1368 Assert.fail(); 1369 } catch (RetriesExhaustedWithDetailsException expected) { 1370 assertEquals(1, expected.getNumExceptions()); 1371 assertTrue(expected.getRow(0) == p); 1372 } 1373 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1374 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1375 } 1376 1377 @Test 1378 public void testReadAndWriteTimeout() throws IOException { 1379 final long readTimeout = 10 * 1000; 1380 final long writeTimeout = 20 * 1000; 1381 Configuration copyConf = new Configuration(CONF); 1382 copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); 1383 copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); 1384 ClusterConnection conn = new MyConnectionImpl(copyConf); 1385 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 1386 try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { 1387 ht.multiAp = ap; 1388 List<Get> gets = new ArrayList<>(); 1389 gets.add(new Get(DUMMY_BYTES_1)); 1390 gets.add(new Get(DUMMY_BYTES_2)); 1391 try { 1392 ht.get(gets); 1393 } catch (ClassCastException e) { 1394 // No result response on this test. 1395 } 1396 assertEquals(readTimeout, ap.previousTimeout); 1397 ap.previousTimeout = -1; 1398 1399 try { 1400 ht.existsAll(gets); 1401 } catch (ClassCastException e) { 1402 // No result response on this test. 1403 } 1404 assertEquals(readTimeout, ap.previousTimeout); 1405 ap.previousTimeout = -1; 1406 1407 List<Delete> deletes = new ArrayList<>(); 1408 deletes.add(new Delete(DUMMY_BYTES_1)); 1409 deletes.add(new Delete(DUMMY_BYTES_2)); 1410 ht.delete(deletes); 1411 assertEquals(writeTimeout, ap.previousTimeout); 1412 } 1413 } 1414 1415 @Test 1416 public void testErrors() throws IOException { 1417 ClusterConnection conn = new MyConnectionImpl(CONF); 1418 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); 1419 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1420 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1421 1422 Assert.assertNotNull(ap.createServerErrorTracker()); 1423 1424 Put p = createPut(1, true); 1425 mutator.mutate(p); 1426 1427 try { 1428 mutator.flush(); 1429 Assert.fail(); 1430 } catch (RetriesExhaustedWithDetailsException expected) { 1431 assertEquals(1, expected.getNumExceptions()); 1432 assertTrue(expected.getRow(0) == p); 1433 } 1434 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1435 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1436 } 1437 1438 @Test 1439 public void testCallQueueTooLarge() throws IOException { 1440 ClusterConnection conn = new MyConnectionImpl(CONF); 1441 AsyncProcessWithFailure ap = 1442 new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); 1443 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1444 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1445 Assert.assertNotNull(ap.createServerErrorTracker()); 1446 Put p = createPut(1, true); 1447 mutator.mutate(p); 1448 1449 try { 1450 mutator.flush(); 1451 Assert.fail(); 1452 } catch (RetriesExhaustedWithDetailsException expected) { 1453 assertEquals(1, expected.getNumExceptions()); 1454 assertTrue(expected.getRow(0) == p); 1455 } 1456 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1457 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1458 } 1459 1460 /** 1461 * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 2 1462 * threads: 1 per server, this whatever the number of regions. 1463 */ 1464 @Test 1465 public void testThreadCreation() throws Exception { 1466 final int NB_REGS = 100; 1467 List<HRegionLocation> hrls = new ArrayList<>(NB_REGS); 1468 List<Get> gets = new ArrayList<>(NB_REGS); 1469 for (int i = 0; i < NB_REGS; i++) { 1470 HRegionInfo hri = 1471 new HRegionInfo(DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); 1472 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); 1473 hrls.add(hrl); 1474 1475 Get get = new Get(Bytes.toBytes(i * 10L)); 1476 gets.add(get); 1477 } 1478 1479 MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); 1480 MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); 1481 HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); 1482 ht.multiAp = ap; 1483 ht.batch(gets, null); 1484 1485 Assert.assertEquals(NB_REGS, ap.nbActions.get()); 1486 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); 1487 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); 1488 1489 int nbReg = 0; 1490 for (int i = 0; i < NB_REGS; i++) { 1491 if (con.usedRegions[i]) nbReg++; 1492 } 1493 Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg); 1494 } 1495 1496 @Test 1497 public void testReplicaReplicaSuccess() throws Exception { 1498 // Main call takes too long so replicas succeed, except for one region w/o replicas. 1499 // One region has no replica, so the main call succeeds for it. 1500 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); 1501 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1502 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1503 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1504 .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build(); 1505 AsyncRequestFuture ars = ap.submit(task); 1506 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); 1507 Assert.assertEquals(2, ap.getReplicaCallCount()); 1508 } 1509 1510 @Test 1511 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { 1512 // Main call succeeds before replica calls are kicked off. 1513 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); 1514 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1515 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1516 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1517 .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build(); 1518 AsyncRequestFuture ars = ap.submit(task); 1519 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); 1520 Assert.assertEquals(0, ap.getReplicaCallCount()); 1521 } 1522 1523 @Test 1524 public void testReplicaParallelCallsSucceed() throws Exception { 1525 // Either main or replica can succeed. 1526 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); 1527 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1528 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1529 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1530 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1531 AsyncRequestFuture ars = ap.submit(task); 1532 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); 1533 long replicaCalls = ap.getReplicaCallCount(); 1534 Assert.assertTrue(replicaCalls >= 0); 1535 Assert.assertTrue(replicaCalls <= 2); 1536 } 1537 1538 @Test 1539 public void testReplicaPartialReplicaCall() throws Exception { 1540 // One server is slow, so the result for its region comes from replica, whereas 1541 // the result for other region comes from primary before replica calls happen. 1542 // There should be no replica call for that region at all. 1543 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); 1544 ap.setPrimaryCallDelay(sn2, 2000); 1545 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1546 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1547 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1548 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1549 AsyncRequestFuture ars = ap.submit(task); 1550 verifyReplicaResult(ars, RR.FALSE, RR.TRUE); 1551 Assert.assertEquals(1, ap.getReplicaCallCount()); 1552 } 1553 1554 @Test 1555 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { 1556 // Main calls fail before replica calls can start - this is currently not handled. 1557 // It would probably never happen if we can get location (due to retries), 1558 // and it would require additional synchronization. 1559 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); 1560 ap.addFailures(hri1, hri2); 1561 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1562 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1563 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1564 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1565 AsyncRequestFuture ars = ap.submit(task); 1566 verifyReplicaResult(ars, RR.FAILED, RR.FAILED); 1567 Assert.assertEquals(0, ap.getReplicaCallCount()); 1568 } 1569 1570 @Test 1571 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { 1572 // Main calls fails after replica calls start. For two-replica region, one replica call 1573 // also fails. Regardless, we get replica results for both regions. 1574 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); 1575 ap.addFailures(hri1, hri1r2, hri2); 1576 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1577 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1578 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1579 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1580 AsyncRequestFuture ars = ap.submit(task); 1581 verifyReplicaResult(ars, RR.TRUE, RR.TRUE); 1582 Assert.assertEquals(2, ap.getReplicaCallCount()); 1583 } 1584 1585 @Test 1586 public void testReplicaAllCallsFailForOneRegion() throws Exception { 1587 // For one of the region, all 3, main and replica, calls fail. For the other, replica 1588 // call fails but its exception should not be visible as it did succeed. 1589 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); 1590 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); 1591 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1592 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1593 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1594 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1595 AsyncRequestFuture ars = ap.submit(task); 1596 verifyReplicaResult(ars, RR.FAILED, RR.FALSE); 1597 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 1598 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); 1599 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { 1600 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); 1601 } 1602 } 1603 1604 private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, 1605 int replicaMs) throws Exception { 1606 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); 1607 } 1608 1609 private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, 1610 int replicaMs, int retries) throws Exception { 1611 // TODO: this is kind of timing dependent... perhaps it should detect from createCaller 1612 // that the replica call has happened and that way control the ordering. 1613 Configuration conf = new Configuration(); 1614 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); 1615 if (retries >= 0) { 1616 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1617 } 1618 ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf)); 1619 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); 1620 ap.setCallDelays(primaryMs, replicaMs); 1621 return ap; 1622 } 1623 1624 private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) { 1625 return new BufferedMutatorParams(name).pool(ap.service).rpcTimeout(RPC_TIMEOUT) 1626 .opertationTimeout(OPERATION_TIMEOUT); 1627 } 1628 1629 private static List<Get> makeTimelineGets(byte[]... rows) { 1630 List<Get> result = new ArrayList<>(rows.length); 1631 for (byte[] row : rows) { 1632 Get get = new Get(row); 1633 get.setConsistency(Consistency.TIMELINE); 1634 result.add(get); 1635 } 1636 return result; 1637 } 1638 1639 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { 1640 Object[] actual = ars.getResults(); 1641 Assert.assertEquals(expected.length, actual.length); 1642 for (int i = 0; i < expected.length; ++i) { 1643 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); 1644 } 1645 } 1646 1647 /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ 1648 private enum RR { 1649 TRUE, 1650 FALSE, 1651 DONT_CARE, 1652 FAILED 1653 } 1654 1655 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { 1656 Object[] actuals = ars.getResults(); 1657 Assert.assertEquals(expecteds.length, actuals.length); 1658 for (int i = 0; i < expecteds.length; ++i) { 1659 Object actual = actuals[i]; 1660 RR expected = expecteds[i]; 1661 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); 1662 if (expected != RR.FAILED && expected != RR.DONT_CARE) { 1663 Assert.assertEquals(expected == RR.TRUE, ((Result) actual).isStale()); 1664 } 1665 } 1666 } 1667 1668 /** 1669 * @param regCnt the region: 1 to 3. 1670 * @param success if true, the put will succeed. 1671 * @return a put 1672 */ 1673 private Put createPut(int regCnt, boolean success) { 1674 Put p; 1675 if (!success) { 1676 p = new Put(FAILS); 1677 } else switch (regCnt) { 1678 case 1: 1679 p = new Put(DUMMY_BYTES_1); 1680 break; 1681 case 2: 1682 p = new Put(DUMMY_BYTES_2); 1683 break; 1684 case 3: 1685 p = new Put(DUMMY_BYTES_3); 1686 break; 1687 default: 1688 throw new IllegalArgumentException("unknown " + regCnt); 1689 } 1690 1691 p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1692 1693 return p; 1694 } 1695 1696 static class MyThreadPoolExecutor extends ThreadPoolExecutor { 1697 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, 1698 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { 1699 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); 1700 } 1701 1702 @Override 1703 public Future submit(Runnable runnable) { 1704 throw new OutOfMemoryError("OutOfMemory error thrown by means"); 1705 } 1706 } 1707 1708 static class AsyncProcessForThrowableCheck extends AsyncProcess { 1709 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { 1710 super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), 1711 new RpcControllerFactory(conf)); 1712 } 1713 } 1714 1715 @Test 1716 public void testUncheckedException() throws Exception { 1717 // Test the case pool.submit throws unchecked exception 1718 ClusterConnection hc = createHConnection(); 1719 MyThreadPoolExecutor myPool = 1720 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); 1721 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); 1722 1723 List<Put> puts = new ArrayList<>(1); 1724 puts.add(createPut(1, true)); 1725 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(myPool).setRpcTimeout(RPC_TIMEOUT) 1726 .setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(puts) 1727 .setSubmittedRows(SubmittedRows.NORMAL).build(); 1728 ap.submit(task); 1729 Assert.assertTrue(puts.isEmpty()); 1730 } 1731 1732 /** 1733 * Below tests make sure we could use a special pause setting when retry an exception where 1734 * {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114 1735 */ 1736 1737 @Test 1738 public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception { 1739 testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException()); 1740 } 1741 1742 @Test 1743 public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception { 1744 testRetryPauseWhenServerIsOverloaded(new CallDroppedException()); 1745 } 1746 1747 @Test 1748 public void testRetryPauseForRpcThrottling() throws IOException { 1749 long waitInterval = 500L; 1750 testRetryPause(new Configuration(CONF), waitInterval, new RpcThrottlingException( 1751 RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test")); 1752 } 1753 1754 private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception) 1755 throws IOException { 1756 Configuration testConf = new Configuration(CONF); 1757 long specialPause = 500L; 1758 testConf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 1759 specialPause); 1760 testRetryPause(testConf, specialPause, exception); 1761 } 1762 1763 private void testRetryPause(Configuration testConf, long expectedPause, 1764 HBaseIOException exception) throws IOException { 1765 1766 final int retries = 1; 1767 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1768 1769 ClusterConnection conn = new MyConnectionImpl(testConf); 1770 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception); 1771 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1772 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1773 1774 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1775 1776 Put p = createPut(1, true); 1777 mutator.mutate(p); 1778 1779 long startTime = EnvironmentEdgeManager.currentTime(); 1780 try { 1781 mutator.flush(); 1782 Assert.fail(); 1783 } catch (RetriesExhaustedWithDetailsException expected) { 1784 assertEquals(1, expected.getNumExceptions()); 1785 assertTrue(expected.getRow(0) == p); 1786 } 1787 long actualSleep = EnvironmentEdgeManager.currentTime() - startTime; 1788 long expectedSleep = 0L; 1789 for (int i = 0; i < retries; i++) { 1790 expectedSleep += ConnectionUtils.getPauseTime(expectedPause, i); 1791 // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result 1792 actualSleep += (long) (expectedPause * 0.01f); 1793 } 1794 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1795 Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", 1796 actualSleep >= expectedSleep); 1797 1798 // check and confirm normal IOE will use the normal pause 1799 final long normalPause = 1800 testConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 1801 ap = new AsyncProcessWithFailure(conn, testConf, new IOException()); 1802 bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1803 mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1804 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1805 mutator.mutate(p); 1806 startTime = EnvironmentEdgeManager.currentTime(); 1807 try { 1808 mutator.flush(); 1809 Assert.fail(); 1810 } catch (RetriesExhaustedWithDetailsException expected) { 1811 assertEquals(1, expected.getNumExceptions()); 1812 assertTrue(expected.getRow(0) == p); 1813 } 1814 actualSleep = EnvironmentEdgeManager.currentTime() - startTime; 1815 expectedSleep = 0L; 1816 for (int i = 0; i < retries; i++) { 1817 expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); 1818 } 1819 // plus an additional pause to balance the program execution time 1820 expectedSleep += normalPause; 1821 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1822 Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); 1823 } 1824 1825 @Test 1826 public void testFastFailIfBackoffGreaterThanRemaining() throws IOException { 1827 Configuration testConf = new Configuration(CONF); 1828 testConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); 1829 long waitInterval = 500L; 1830 HBaseIOException exception = new RpcThrottlingException( 1831 RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test"); 1832 1833 final int retries = 1; 1834 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1835 1836 ClusterConnection conn = new MyConnectionImpl(testConf); 1837 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception); 1838 BufferedMutatorParams bufferParam = 1839 createBufferedMutatorParams(ap, DUMMY_TABLE).operationTimeout(100); 1840 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1841 1842 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1843 1844 Put p = createPut(1, true); 1845 mutator.mutate(p); 1846 1847 try { 1848 mutator.flush(); 1849 Assert.fail(); 1850 } catch (RetriesExhaustedWithDetailsException expected) { 1851 assertEquals(1, expected.getNumExceptions()); 1852 assertTrue(expected.getCause(0) instanceof OperationTimeoutExceededException); 1853 assertTrue(expected.getCause(0).getMessage().startsWith("Backoff")); 1854 } 1855 } 1856 1857 /** 1858 * Tests that we properly recover from exceptions that DO NOT go through receiveGlobalFailure, due 1859 * to updating the meta cache for the region which failed. Successful multigets can include region 1860 * exceptions in the MultiResponse. In that case, it skips receiveGlobalFailure and instead 1861 * handles in receiveMultiAction 1862 */ 1863 @Test 1864 public void testRetryWithExceptionClearsMetaCacheUsingRegionException() throws Exception { 1865 testRetryWithExceptionClearsMetaCache(true); 1866 } 1867 1868 /** 1869 * Tests that we properly recover from exceptions that go through receiveGlobalFailure, due to 1870 * updating the meta cache for the region which failed. 1871 */ 1872 @Test 1873 public void testRetryWithExceptionClearsMetaCacheUsingServerException() throws Exception { 1874 testRetryWithExceptionClearsMetaCache(false); 1875 } 1876 1877 private void testRetryWithExceptionClearsMetaCache(boolean useRegionException) 1878 throws IOException { 1879 Configuration myConf = new Configuration(CONF); 1880 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 1881 ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf)); 1882 1883 // we pass in loc1.getServerName here so that only calls to that server will fail 1884 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, 1885 new RegionOpeningException("test"), loc1.getServerName(), useRegionException); 1886 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1887 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1888 1889 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1890 1891 Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1892 new RegionLocations(loc1).toString()); 1893 1894 // simulate updateCachedLocations, by changing the loc for this row to loc3. only loc1 fails, 1895 // so this means retry will succeed 1896 Mockito.doAnswer(invocation -> { 1897 setMockLocation(conn, DUMMY_BYTES_1, new RegionLocations(loc3)); 1898 return null; 1899 }).when(conn).updateCachedLocations(Mockito.eq(DUMMY_TABLE), 1900 Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(), 1901 Mockito.eq(loc1.getServerName())); 1902 1903 // Ensure we haven't called updateCachedLocations yet 1904 Mockito.verify(conn, Mockito.times(0)).updateCachedLocations(Mockito.any(), Mockito.any(), 1905 Mockito.any(), Mockito.any(), Mockito.any()); 1906 1907 Put p = createPut(1, true); 1908 mutator.mutate(p); 1909 1910 // we expect this to succeed because the bad region location should be updated upon 1911 // the initial failure causing retries to succeed. 1912 mutator.flush(); 1913 1914 // validate that we updated the location, as we expected 1915 Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1916 new RegionLocations(loc3).toString()); 1917 // this is a given since the location updated, but validate that we called updateCachedLocations 1918 Mockito.verify(conn, Mockito.atLeastOnce()).updateCachedLocations(Mockito.eq(DUMMY_TABLE), 1919 Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(), 1920 Mockito.eq(loc1.getServerName())); 1921 } 1922 1923 @Test 1924 public void testQueueRowAccess() throws Exception { 1925 ClusterConnection conn = createHConnection(); 1926 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, 1927 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); 1928 Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1929 Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 1930 mutator.mutate(p0); 1931 BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); 1932 // QueueRowAccess should take all undealt mutations 1933 assertEquals(0, mutator.size()); 1934 mutator.mutate(p1); 1935 assertEquals(1, mutator.size()); 1936 BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); 1937 // QueueRowAccess should take all undealt mutations 1938 assertEquals(0, mutator.size()); 1939 assertEquals(1, ra0.size()); 1940 assertEquals(1, ra1.size()); 1941 Iterator<Row> iter0 = ra0.iterator(); 1942 Iterator<Row> iter1 = ra1.iterator(); 1943 assertTrue(iter0.hasNext()); 1944 assertTrue(iter1.hasNext()); 1945 // the next() will poll the mutation from inner buffer and update the buffer count 1946 assertTrue(iter0.next() == p0); 1947 assertEquals(1, mutator.getUnflushedSize()); 1948 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1949 assertTrue(iter1.next() == p1); 1950 assertEquals(0, mutator.getUnflushedSize()); 1951 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1952 assertFalse(iter0.hasNext()); 1953 assertFalse(iter1.hasNext()); 1954 // ra0 doest handle the mutation so the mutation won't be pushed back to buffer 1955 iter0.remove(); 1956 ra0.close(); 1957 assertEquals(0, mutator.size()); 1958 assertEquals(0, mutator.getUnflushedSize()); 1959 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1960 // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer 1961 ra1.close(); 1962 assertEquals(1, mutator.size()); 1963 assertEquals(1, mutator.getUnflushedSize()); 1964 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1965 } 1966}