001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036import java.util.function.Consumer; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdge; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.apache.yetus.audience.InterfaceStability; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * Holds back the requests if they reach any thresholds. 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055class SimpleRequestController implements RequestController { 056 private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class); 057 /** 058 * The maximum heap size for each request. 059 */ 060 public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 061 "hbase.client.max.perrequest.heapsize"; 062 063 /** 064 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. 065 */ 066 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; 067 068 /** 069 * The maximum number of rows for each request. 070 */ 071 public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows"; 072 /** 073 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}. 074 */ 075 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048; 076 077 /** 078 * The maximum size of submit. 079 */ 080 public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; 081 /** 082 * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. 083 */ 084 static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = 085 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; 086 final AtomicLong tasksInProgress = new AtomicLong(0); 087 final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = 088 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 089 final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>(); 090 /** 091 * The number of tasks simultaneously executed on the cluster. 092 */ 093 private final int maxTotalConcurrentTasks; 094 095 /** 096 * The maximum heap size for each request. 097 */ 098 private final long maxHeapSizePerRequest; 099 /** 100 * The maximum number of rows for each request. 101 */ 102 private final long maxRowsPerRequest; 103 private final long maxHeapSizeSubmit; 104 /** 105 * The number of tasks we run in parallel on a single region. With 1 (the default) , we ensure 106 * that the ordering of the queries is respected: we don't start a set of operations on a region 107 * before the previous one is done. As well, this limits the pressure we put on the region server. 108 */ 109 final int maxConcurrentTasksPerRegion; 110 111 /** 112 * The number of task simultaneously executed on a single region server. 113 */ 114 final int maxConcurrentTasksPerServer; 115 private final int thresholdToLogUndoneTaskDetails; 116 public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 117 "hbase.client.threshold.log.details"; 118 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; 119 public static final String THRESHOLD_TO_LOG_REGION_DETAILS = 120 "hbase.client.threshold.log.region.details"; 121 private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2; 122 private final int thresholdToLogRegionDetails; 123 124 SimpleRequestController(final Configuration conf) { 125 this.maxTotalConcurrentTasks = checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 126 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); 127 this.maxConcurrentTasksPerServer = 128 checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, 129 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); 130 this.maxConcurrentTasksPerRegion = 131 checkAndGet(conf, HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, 132 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); 133 this.maxHeapSizePerRequest = checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 134 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 135 this.maxRowsPerRequest = 136 checkAndGet(conf, HBASE_CLIENT_MAX_PERREQUEST_ROWS, DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS); 137 this.maxHeapSizeSubmit = 138 checkAndGet(conf, HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); 139 this.thresholdToLogUndoneTaskDetails = conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, 140 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); 141 this.thresholdToLogRegionDetails = 142 conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS, DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS); 143 } 144 145 private static int checkAndGet(Configuration conf, String key, int defaultValue) { 146 int value = conf.getInt(key, defaultValue); 147 if (value <= 0) { 148 throw new IllegalArgumentException(key + "=" + value); 149 } 150 return value; 151 } 152 153 private static long checkAndGet(Configuration conf, String key, long defaultValue) { 154 long value = conf.getLong(key, defaultValue); 155 if (value <= 0) { 156 throw new IllegalArgumentException(key + "=" + value); 157 } 158 return value; 159 } 160 161 static Checker newChecker(List<RowChecker> checkers) { 162 return new Checker() { 163 private boolean isEnd = false; 164 165 @Override 166 public ReturnCode canTakeRow(HRegionLocation loc, Row row) { 167 if (isEnd) { 168 return ReturnCode.END; 169 } 170 long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0; 171 ReturnCode code = ReturnCode.INCLUDE; 172 for (RowChecker checker : checkers) { 173 switch (checker.canTakeOperation(loc, heapSizeOfRow)) { 174 case END: 175 isEnd = true; 176 code = ReturnCode.END; 177 break; 178 case SKIP: 179 code = ReturnCode.SKIP; 180 break; 181 case INCLUDE: 182 default: 183 break; 184 } 185 if (code == ReturnCode.END) { 186 break; 187 } 188 } 189 for (RowChecker checker : checkers) { 190 checker.notifyFinal(code, loc, heapSizeOfRow); 191 } 192 return code; 193 } 194 195 @Override 196 public void reset() throws InterruptedIOException { 197 isEnd = false; 198 InterruptedIOException e = null; 199 for (RowChecker checker : checkers) { 200 try { 201 checker.reset(); 202 } catch (InterruptedIOException ex) { 203 e = ex; 204 } 205 } 206 if (e != null) { 207 throw e; 208 } 209 } 210 }; 211 } 212 213 @Override 214 public Checker newChecker() { 215 List<RowChecker> checkers = new ArrayList<>(4); 216 checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, maxConcurrentTasksPerServer, 217 maxConcurrentTasksPerRegion, tasksInProgress, taskCounterPerServer, taskCounterPerRegion)); 218 checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest)); 219 checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit)); 220 checkers.add(new RequestRowsChecker(maxRowsPerRequest)); 221 return newChecker(checkers); 222 } 223 224 @Override 225 public void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 226 tasksInProgress.incrementAndGet(); 227 228 computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); 229 230 regions 231 .forEach((regBytes) -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new) 232 .incrementAndGet()); 233 } 234 235 @Override 236 public void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 237 regions.forEach(regBytes -> { 238 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); 239 regionCnt.decrementAndGet(); 240 }); 241 242 taskCounterPerServer.get(sn).decrementAndGet(); 243 tasksInProgress.decrementAndGet(); 244 synchronized (tasksInProgress) { 245 tasksInProgress.notifyAll(); 246 } 247 } 248 249 @Override 250 public long getNumberOfTasksInProgress() { 251 return tasksInProgress.get(); 252 } 253 254 @Override 255 public void waitForMaximumCurrentTasks(long max, long id, int periodToTrigger, 256 Consumer<Long> trigger) throws InterruptedIOException { 257 assert max >= 0; 258 long lastLog = EnvironmentEdgeManager.currentTime(); 259 long currentInProgress, oldInProgress = Long.MAX_VALUE; 260 while ((currentInProgress = tasksInProgress.get()) > max) { 261 if (oldInProgress != currentInProgress) { // Wait for in progress to change. 262 long now = EnvironmentEdgeManager.currentTime(); 263 if (now > lastLog + periodToTrigger) { 264 lastLog = now; 265 if (trigger != null) { 266 trigger.accept(currentInProgress); 267 } 268 logDetailsOfUndoneTasks(currentInProgress); 269 } 270 } 271 oldInProgress = currentInProgress; 272 try { 273 synchronized (tasksInProgress) { 274 if (tasksInProgress.get() == oldInProgress) { 275 tasksInProgress.wait(10); 276 } 277 } 278 } catch (InterruptedException e) { 279 throw new InterruptedIOException( 280 "#" + id + ", interrupted." + " currentNumberOfTask=" + currentInProgress); 281 } 282 } 283 } 284 285 private void logDetailsOfUndoneTasks(long taskInProgress) { 286 if (taskInProgress <= thresholdToLogUndoneTaskDetails) { 287 ArrayList<ServerName> servers = new ArrayList<>(); 288 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { 289 if (entry.getValue().get() > 0) { 290 servers.add(entry.getKey()); 291 } 292 } 293 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); 294 } 295 296 if (taskInProgress <= thresholdToLogRegionDetails) { 297 ArrayList<String> regions = new ArrayList<>(); 298 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { 299 if (entry.getValue().get() > 0) { 300 regions.add(Bytes.toString(entry.getKey())); 301 } 302 } 303 LOG.info("Regions against which left over task(s) are processed: " + regions); 304 } 305 } 306 307 @Override 308 public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) 309 throws InterruptedIOException { 310 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); 311 } 312 313 /** 314 * limit the heapsize of total submitted data. Reduce the limit of heapsize for submitting quickly 315 * if there is no running task. 316 */ 317 static class SubmittedSizeChecker implements RowChecker { 318 319 private final long maxHeapSizeSubmit; 320 private long heapSize = 0; 321 322 SubmittedSizeChecker(final long maxHeapSizeSubmit) { 323 this.maxHeapSizeSubmit = maxHeapSizeSubmit; 324 } 325 326 @Override 327 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 328 if (heapSize >= maxHeapSizeSubmit) { 329 return ReturnCode.END; 330 } 331 return ReturnCode.INCLUDE; 332 } 333 334 @Override 335 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 336 if (code == ReturnCode.INCLUDE) { 337 heapSize += heapSizeOfRow; 338 } 339 } 340 341 @Override 342 public void reset() { 343 heapSize = 0; 344 } 345 } 346 347 /** 348 * limit the max number of tasks in an AsyncProcess. 349 */ 350 static class TaskCountChecker implements RowChecker { 351 352 private static final long MAX_WAITING_TIME = 1000; // ms 353 private final Set<HRegionInfo> regionsIncluded = new HashSet<>(); 354 private final Set<ServerName> serversIncluded = new HashSet<>(); 355 private final int maxConcurrentTasksPerRegion; 356 private final int maxTotalConcurrentTasks; 357 private final int maxConcurrentTasksPerServer; 358 private final Map<byte[], AtomicInteger> taskCounterPerRegion; 359 private final Map<ServerName, AtomicInteger> taskCounterPerServer; 360 private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); 361 private final AtomicLong tasksInProgress; 362 363 TaskCountChecker(final int maxTotalConcurrentTasks, final int maxConcurrentTasksPerServer, 364 final int maxConcurrentTasksPerRegion, final AtomicLong tasksInProgress, 365 final Map<ServerName, AtomicInteger> taskCounterPerServer, 366 final Map<byte[], AtomicInteger> taskCounterPerRegion) { 367 this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; 368 this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; 369 this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; 370 this.taskCounterPerRegion = taskCounterPerRegion; 371 this.taskCounterPerServer = taskCounterPerServer; 372 this.tasksInProgress = tasksInProgress; 373 } 374 375 @Override 376 public void reset() throws InterruptedIOException { 377 // prevent the busy-waiting 378 waitForRegion(); 379 regionsIncluded.clear(); 380 serversIncluded.clear(); 381 busyRegions.clear(); 382 } 383 384 private void waitForRegion() throws InterruptedIOException { 385 if (busyRegions.isEmpty()) { 386 return; 387 } 388 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 389 final long start = ee.currentTime(); 390 while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { 391 for (byte[] region : busyRegions) { 392 AtomicInteger count = taskCounterPerRegion.get(region); 393 if (count == null || count.get() < maxConcurrentTasksPerRegion) { 394 return; 395 } 396 } 397 try { 398 synchronized (tasksInProgress) { 399 tasksInProgress.wait(10); 400 } 401 } catch (InterruptedException e) { 402 throw new InterruptedIOException("Interrupted." + " tasksInProgress=" + tasksInProgress); 403 } 404 } 405 } 406 407 /** 408 * 1) check the regions is allowed. 2) check the concurrent tasks for regions. 3) check the 409 * total concurrent tasks. 4) check the concurrent tasks for server. 410 * @param loc the destination of data 411 * @param heapSizeOfRow the data size 412 * @return either Include {@link RequestController.ReturnCode} or skip 413 * {@link RequestController.ReturnCode} 414 */ 415 @Override 416 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 417 418 HRegionInfo regionInfo = loc.getRegionInfo(); 419 if (regionsIncluded.contains(regionInfo)) { 420 // We already know what to do with this region. 421 return ReturnCode.INCLUDE; 422 } 423 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); 424 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { 425 // Too many tasks on this region already. 426 return ReturnCode.SKIP; 427 } 428 int newServers = 429 serversIncluded.size() + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); 430 if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { 431 // Too many tasks. 432 return ReturnCode.SKIP; 433 } 434 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); 435 if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { 436 // Too many tasks for this individual server 437 return ReturnCode.SKIP; 438 } 439 return ReturnCode.INCLUDE; 440 } 441 442 @Override 443 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 444 if (code == ReturnCode.INCLUDE) { 445 regionsIncluded.add(loc.getRegionInfo()); 446 serversIncluded.add(loc.getServerName()); 447 } 448 busyRegions.add(loc.getRegionInfo().getRegionName()); 449 } 450 } 451 452 /** 453 * limit the number of rows for each request. 454 */ 455 static class RequestRowsChecker implements RowChecker { 456 457 private final long maxRowsPerRequest; 458 private final Map<ServerName, Long> serverRows = new HashMap<>(); 459 460 RequestRowsChecker(final long maxRowsPerRequest) { 461 this.maxRowsPerRequest = maxRowsPerRequest; 462 } 463 464 @Override 465 public void reset() { 466 serverRows.clear(); 467 } 468 469 @Override 470 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 471 long currentRows = 472 serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L; 473 // accept at least one row 474 if (currentRows == 0 || currentRows < maxRowsPerRequest) { 475 return ReturnCode.INCLUDE; 476 } 477 return ReturnCode.SKIP; 478 } 479 480 @Override 481 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 482 if (code == ReturnCode.INCLUDE) { 483 long currentRows = 484 serverRows.containsKey(loc.getServerName()) ? serverRows.get(loc.getServerName()) : 0L; 485 serverRows.put(loc.getServerName(), currentRows + 1); 486 } 487 } 488 } 489 490 /** 491 * limit the heap size for each request. 492 */ 493 static class RequestHeapSizeChecker implements RowChecker { 494 495 private final long maxHeapSizePerRequest; 496 private final Map<ServerName, Long> serverRequestSizes = new HashMap<>(); 497 498 RequestHeapSizeChecker(final long maxHeapSizePerRequest) { 499 this.maxHeapSizePerRequest = maxHeapSizePerRequest; 500 } 501 502 @Override 503 public void reset() { 504 serverRequestSizes.clear(); 505 } 506 507 @Override 508 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) { 509 // Is it ok for limit of request size? 510 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 511 ? serverRequestSizes.get(loc.getServerName()) 512 : 0L; 513 // accept at least one request 514 if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) { 515 return ReturnCode.INCLUDE; 516 } 517 return ReturnCode.SKIP; 518 } 519 520 @Override 521 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) { 522 if (code == ReturnCode.INCLUDE) { 523 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) 524 ? serverRequestSizes.get(loc.getServerName()) 525 : 0L; 526 serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow); 527 } 528 } 529 } 530 531 /** 532 * Provide a way to control the flow of rows iteration. 533 */ 534 interface RowChecker { 535 536 ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow); 537 538 /** 539 * Add the final ReturnCode to the checker. The ReturnCode may be reversed, so the checker need 540 * the final decision to update the inner state. 541 * @param code The final decision 542 * @param loc the destination of data 543 * @param heapSizeOfRow the data size 544 */ 545 void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow); 546 547 /** 548 * Reset the inner state. 549 */ 550 void reset() throws InterruptedIOException; 551 } 552}