001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.AbstractMap.SimpleEntry; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 047 048/** 049 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put 050 * will be sharded into different buffer queues based on its destination region server. So each 051 * region server buffer queue will only have the puts which share the same destination. And each 052 * queue will have a flush worker thread to flush the puts request to the region server. If any 053 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue. 054 * </p> 055 * Also all the puts will be retried as a configuration number before dropping. And the 056 * HTableMultiplexer can report the number of buffered requests and the number of the failed 057 * (dropped) requests in total or on per region server basis. 058 * <p/> 059 * This class is thread safe. 060 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 061 * {@link BufferedMutator} for batching mutations. 062 */ 063@Deprecated 064@InterfaceAudience.Public 065public class HTableMultiplexer { 066 private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); 067 068 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = 069 "hbase.tablemultiplexer.flush.period.ms"; 070 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; 071 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = 072 "hbase.client.max.retries.in.queue"; 073 074 /** The map between each region server to its flush worker */ 075 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = 076 new ConcurrentHashMap<>(); 077 078 private final Configuration conf; 079 private final ClusterConnection conn; 080 private final ExecutorService pool; 081 private final int maxAttempts; 082 private final int perRegionServerBufferQueueSize; 083 private final int maxKeyValueSize; 084 private final ScheduledExecutorService executor; 085 private final long flushPeriod; 086 087 /** 088 * @param conf The HBaseConfiguration 089 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 090 * each region server before dropping the request. 091 */ 092 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) 093 throws IOException { 094 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); 095 } 096 097 /** 098 * @param conn The HBase connection. 099 * @param conf The HBase configuration 100 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 101 * each region server before dropping the request. 102 */ 103 public HTableMultiplexer(Connection conn, Configuration conf, 104 int perRegionServerBufferQueueSize) { 105 this.conn = (ClusterConnection) conn; 106 this.pool = HTable.getDefaultExecutor(conf); 107 // how many times we could try in total, one more than retry number 108 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 109 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 110 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; 111 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); 112 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); 113 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); 114 this.executor = Executors.newScheduledThreadPool(initThreads, 115 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); 116 this.conf = conf; 117 } 118 119 /** 120 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already been 121 * closed. 122 * @throws IOException If there is an error closing the connection. 123 */ 124 public synchronized void close() throws IOException { 125 if (!getConnection().isClosed()) { 126 getConnection().close(); 127 } 128 } 129 130 /** 131 * The put request will be buffered by its corresponding buffer queue. Return false if the queue 132 * is already full. 133 * @return true if the request can be accepted by its corresponding buffer queue. 134 */ 135 public boolean put(TableName tableName, final Put put) { 136 return put(tableName, put, this.maxAttempts); 137 } 138 139 /** 140 * The puts request will be buffered by their corresponding buffer queue. Return the list of puts 141 * which could not be queued. 142 * @return the list of puts which could not be queued 143 */ 144 public List<Put> put(TableName tableName, final List<Put> puts) { 145 if (puts == null) return null; 146 147 List<Put> failedPuts = null; 148 boolean result; 149 for (Put put : puts) { 150 result = put(tableName, put, this.maxAttempts); 151 if (result == false) { 152 153 // Create the failed puts list if necessary 154 if (failedPuts == null) { 155 failedPuts = new ArrayList<>(); 156 } 157 // Add the put to the failed puts list 158 failedPuts.add(put); 159 } 160 } 161 return failedPuts; 162 } 163 164 /** 165 * @deprecated Use {@link #put(TableName, List) } instead. 166 */ 167 @Deprecated 168 public List<Put> put(byte[] tableName, final List<Put> puts) { 169 return put(TableName.valueOf(tableName), puts); 170 } 171 172 /** 173 * The put request will be buffered by its corresponding buffer queue. And the put request will be 174 * retried before dropping the request. Return false if the queue is already full. 175 * @return true if the request can be accepted by its corresponding buffer queue. 176 */ 177 public boolean put(final TableName tableName, final Put put, int maxAttempts) { 178 if (maxAttempts <= 0) { 179 return false; 180 } 181 182 try { 183 ConnectionUtils.validatePut(put, maxKeyValueSize); 184 // Allow mocking to get at the connection, but don't expose the connection to users. 185 ClusterConnection conn = (ClusterConnection) getConnection(); 186 // AsyncProcess in the FlushWorker should take care of refreshing the location cache 187 // as necessary. We shouldn't have to do that here. 188 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); 189 if (loc != null) { 190 // Add the put pair into its corresponding queue. 191 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); 192 193 // Generate a MultiPutStatus object and offer it into the queue 194 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); 195 196 return queue.offer(s); 197 } 198 } catch (IOException e) { 199 LOG.debug("Cannot process the put " + put, e); 200 } 201 return false; 202 } 203 204 /** 205 * @deprecated Use {@link #put(TableName, Put) } instead. 206 */ 207 @Deprecated 208 public boolean put(final byte[] tableName, final Put put, int retry) { 209 return put(TableName.valueOf(tableName), put, retry); 210 } 211 212 /** 213 * @deprecated Use {@link #put(TableName, Put)} instead. 214 */ 215 @Deprecated 216 public boolean put(final byte[] tableName, Put put) { 217 return put(TableName.valueOf(tableName), put); 218 } 219 220 /** Returns the current HTableMultiplexerStatus */ 221 public HTableMultiplexerStatus getHTableMultiplexerStatus() { 222 return new HTableMultiplexerStatus(serverToFlushWorkerMap); 223 } 224 225 @InterfaceAudience.Private 226 @SuppressWarnings("FutureReturnValueIgnored") 227 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { 228 FlushWorker worker = serverToFlushWorkerMap.get(addr); 229 if (worker == null) { 230 synchronized (this.serverToFlushWorkerMap) { 231 worker = serverToFlushWorkerMap.get(addr); 232 if (worker == null) { 233 // Create the flush worker 234 worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize, 235 pool, executor); 236 this.serverToFlushWorkerMap.put(addr, worker); 237 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); 238 } 239 } 240 } 241 return worker.getQueue(); 242 } 243 244 @InterfaceAudience.Private 245 ClusterConnection getConnection() { 246 return this.conn; 247 } 248 249 /** 250 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the 251 * number of buffered requests and the number of the failed (dropped) requests in total or on per 252 * region server basis. 253 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 254 * {@link BufferedMutator} for batching mutations. 255 */ 256 @Deprecated 257 @InterfaceAudience.Public 258 public static class HTableMultiplexerStatus { 259 private long totalFailedPutCounter; 260 private long totalBufferedPutCounter; 261 private long maxLatency; 262 private long overallAverageLatency; 263 private Map<String, Long> serverToFailedCounterMap; 264 private Map<String, Long> serverToBufferedCounterMap; 265 private Map<String, Long> serverToAverageLatencyMap; 266 private Map<String, Long> serverToMaxLatencyMap; 267 268 public HTableMultiplexerStatus(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 269 this.totalBufferedPutCounter = 0; 270 this.totalFailedPutCounter = 0; 271 this.maxLatency = 0; 272 this.overallAverageLatency = 0; 273 this.serverToBufferedCounterMap = new HashMap<>(); 274 this.serverToFailedCounterMap = new HashMap<>(); 275 this.serverToAverageLatencyMap = new HashMap<>(); 276 this.serverToMaxLatencyMap = new HashMap<>(); 277 this.initialize(serverToFlushWorkerMap); 278 } 279 280 private void initialize(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 281 if (serverToFlushWorkerMap == null) { 282 return; 283 } 284 285 long averageCalcSum = 0; 286 int averageCalcCount = 0; 287 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap.entrySet()) { 288 HRegionLocation addr = entry.getKey(); 289 FlushWorker worker = entry.getValue(); 290 291 long bufferedCounter = worker.getTotalBufferedCount(); 292 long failedCounter = worker.getTotalFailedCount(); 293 long serverMaxLatency = worker.getMaxLatency(); 294 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); 295 // Get sum and count pieces separately to compute overall average 296 SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents(); 297 long serverAvgLatency = averageCounter.getAndReset(); 298 299 this.totalBufferedPutCounter += bufferedCounter; 300 this.totalFailedPutCounter += failedCounter; 301 if (serverMaxLatency > this.maxLatency) { 302 this.maxLatency = serverMaxLatency; 303 } 304 averageCalcSum += averageComponents.getKey(); 305 averageCalcCount += averageComponents.getValue(); 306 307 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter); 308 this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter); 309 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency); 310 this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency); 311 } 312 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / averageCalcCount : 0; 313 } 314 315 public long getTotalBufferedCounter() { 316 return this.totalBufferedPutCounter; 317 } 318 319 public long getTotalFailedCounter() { 320 return this.totalFailedPutCounter; 321 } 322 323 public long getMaxLatency() { 324 return this.maxLatency; 325 } 326 327 public long getOverallAverageLatency() { 328 return this.overallAverageLatency; 329 } 330 331 public Map<String, Long> getBufferedCounterForEachRegionServer() { 332 return this.serverToBufferedCounterMap; 333 } 334 335 public Map<String, Long> getFailedCounterForEachRegionServer() { 336 return this.serverToFailedCounterMap; 337 } 338 339 public Map<String, Long> getMaxLatencyForEachRegionServer() { 340 return this.serverToMaxLatencyMap; 341 } 342 343 public Map<String, Long> getAverageLatencyForEachRegionServer() { 344 return this.serverToAverageLatencyMap; 345 } 346 } 347 348 @InterfaceAudience.Private 349 static class PutStatus { 350 final RegionInfo regionInfo; 351 final Put put; 352 final int maxAttempCount; 353 354 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { 355 this.regionInfo = regionInfo; 356 this.put = put; 357 this.maxAttempCount = maxAttempCount; 358 } 359 } 360 361 /** 362 * Helper to count the average over an interval until reset. 363 */ 364 private static class AtomicAverageCounter { 365 private long sum; 366 private int count; 367 368 public AtomicAverageCounter() { 369 this.sum = 0L; 370 this.count = 0; 371 } 372 373 public synchronized long getAndReset() { 374 long result = this.get(); 375 this.reset(); 376 return result; 377 } 378 379 public synchronized long get() { 380 if (this.count == 0) { 381 return 0; 382 } 383 return this.sum / this.count; 384 } 385 386 public synchronized SimpleEntry<Long, Integer> getComponents() { 387 return new SimpleEntry<>(sum, count); 388 } 389 390 public synchronized void reset() { 391 this.sum = 0L; 392 this.count = 0; 393 } 394 395 public synchronized void add(long value) { 396 this.sum += value; 397 this.count++; 398 } 399 } 400 401 @InterfaceAudience.Private 402 static class FlushWorker implements Runnable { 403 private final HRegionLocation addr; 404 private final LinkedBlockingQueue<PutStatus> queue; 405 private final HTableMultiplexer multiplexer; 406 private final AtomicLong totalFailedPutCount = new AtomicLong(0); 407 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); 408 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); 409 private final AtomicLong maxLatency = new AtomicLong(0); 410 411 private final AsyncProcess ap; 412 private final List<PutStatus> processingList = new ArrayList<>(); 413 private final ScheduledExecutorService executor; 414 private final int maxRetryInQueue; 415 private final AtomicInteger retryInQueue = new AtomicInteger(0); 416 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor 417 private final int operationTimeout; 418 private final ExecutorService pool; 419 420 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, 421 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, 422 ScheduledExecutorService executor) { 423 this.addr = addr; 424 this.multiplexer = htableMultiplexer; 425 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); 426 final ConnectionConfiguration connectionConfig = 427 conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf); 428 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, 429 connectionConfig, conn == null ? null : conn.getConnectionMetrics()); 430 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); 431 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 432 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 433 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 434 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 435 // Specify 0 retries in AsyncProcess because we need to reassign puts to different queues 436 // if regions are moved. 437 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0); 438 this.executor = executor; 439 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); 440 this.pool = pool; 441 } 442 443 protected LinkedBlockingQueue<PutStatus> getQueue() { 444 return this.queue; 445 } 446 447 public long getTotalFailedCount() { 448 return totalFailedPutCount.get(); 449 } 450 451 public long getTotalBufferedCount() { 452 return (long) queue.size() + currentProcessingCount.get(); 453 } 454 455 public AtomicAverageCounter getAverageLatencyCounter() { 456 return this.averageLatency; 457 } 458 459 public long getMaxLatency() { 460 return this.maxLatency.getAndSet(0); 461 } 462 463 @SuppressWarnings("FutureReturnValueIgnored") 464 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { 465 // Decrease the retry count 466 final int retryCount = ps.maxAttempCount - 1; 467 468 if (retryCount <= 0) { 469 // Update the failed counter and no retry any more. 470 return false; 471 } 472 473 int cnt = getRetryInQueue().incrementAndGet(); 474 if (cnt > getMaxRetryInQueue()) { 475 // Too many Puts in queue for resubmit, give up this 476 getRetryInQueue().decrementAndGet(); 477 return false; 478 } 479 480 final Put failedPut = ps.put; 481 // The currentPut is failed. So get the table name for the currentPut. 482 final TableName tableName = ps.regionInfo.getTable(); 483 484 long delayMs = getNextDelay(retryCount); 485 if (LOG.isDebugEnabled()) { 486 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); 487 } 488 489 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating 490 // the region location cache when the Put original failed with some exception. If we keep 491 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff 492 // that we expect it to. 493 getExecutor().schedule(new Runnable() { 494 @Override 495 public void run() { 496 boolean succ = false; 497 try { 498 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); 499 } finally { 500 FlushWorker.this.getRetryInQueue().decrementAndGet(); 501 if (!succ) { 502 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); 503 } 504 } 505 } 506 }, delayMs, TimeUnit.MILLISECONDS); 507 return true; 508 } 509 510 @InterfaceAudience.Private 511 long getNextDelay(int retryCount) { 512 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, 513 multiplexer.maxAttempts - retryCount - 1); 514 } 515 516 @InterfaceAudience.Private 517 AtomicInteger getRetryInQueue() { 518 return this.retryInQueue; 519 } 520 521 @InterfaceAudience.Private 522 int getMaxRetryInQueue() { 523 return this.maxRetryInQueue; 524 } 525 526 @InterfaceAudience.Private 527 AtomicLong getTotalFailedPutCount() { 528 return this.totalFailedPutCount; 529 } 530 531 @InterfaceAudience.Private 532 HTableMultiplexer getMultiplexer() { 533 return this.multiplexer; 534 } 535 536 @InterfaceAudience.Private 537 ScheduledExecutorService getExecutor() { 538 return this.executor; 539 } 540 541 @Override 542 public void run() { 543 int failedCount = 0; 544 try { 545 long start = EnvironmentEdgeManager.currentTime(); 546 547 // drain all the queued puts into the tmp list 548 processingList.clear(); 549 queue.drainTo(processingList); 550 if (processingList.isEmpty()) { 551 // Nothing to flush 552 return; 553 } 554 555 currentProcessingCount.set(processingList.size()); 556 // failedCount is decreased whenever a Put is success or resubmit. 557 failedCount = processingList.size(); 558 559 List<Action> retainedActions = new ArrayList<>(processingList.size()); 560 MultiAction actions = new MultiAction(); 561 for (int i = 0; i < processingList.size(); i++) { 562 PutStatus putStatus = processingList.get(i); 563 Action action = new Action(putStatus.put, i); 564 actions.add(putStatus.regionInfo.getRegionName(), action); 565 retainedActions.add(action); 566 } 567 568 // Process this multi-put request 569 List<PutStatus> failed = null; 570 Object[] results = new Object[actions.size()]; 571 ServerName server = addr.getServerName(); 572 Map<ServerName, MultiAction> actionsByServer = Collections.singletonMap(server, actions); 573 try { 574 AsyncProcessTask task = AsyncProcessTask.newBuilder().setResults(results).setPool(pool) 575 .setRpcTimeout(writeRpcTimeout).setOperationTimeout(operationTimeout).build(); 576 AsyncRequestFuture arf = 577 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); 578 arf.waitUntilDone(); 579 if (arf.hasError()) { 580 // We just log and ignore the exception here since failed Puts will be resubmit again. 581 LOG.debug("Caught some exceptions when flushing puts to region server " 582 + addr.getHostnamePort(), arf.getErrors()); 583 } 584 } finally { 585 for (int i = 0; i < results.length; i++) { 586 if (results[i] instanceof Result) { 587 failedCount--; 588 } else { 589 if (failed == null) { 590 failed = new ArrayList<>(); 591 } 592 failed.add(processingList.get(i)); 593 } 594 } 595 } 596 597 if (failed != null) { 598 // Resubmit failed puts 599 for (PutStatus putStatus : failed) { 600 if (resubmitFailedPut(putStatus, this.addr)) { 601 failedCount--; 602 } 603 } 604 } 605 606 long elapsed = EnvironmentEdgeManager.currentTime() - start; 607 // Update latency counters 608 averageLatency.add(elapsed); 609 if (elapsed > maxLatency.get()) { 610 maxLatency.set(elapsed); 611 } 612 613 // Log some basic info 614 if (LOG.isDebugEnabled()) { 615 LOG.debug( 616 "Processed " + currentProcessingCount + " put requests for " + addr.getHostnamePort() 617 + " and " + failedCount + " failed" + ", latency for this send: " + elapsed); 618 } 619 620 // Reset the current processing put count 621 currentProcessingCount.set(0); 622 } catch (RuntimeException e) { 623 // To make findbugs happy 624 // Log all the exceptions and move on 625 LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " 626 + addr.getHostnamePort(), e); 627 } catch (Exception e) { 628 if (e instanceof InterruptedException) { 629 Thread.currentThread().interrupt(); 630 } 631 // Log all the exceptions and move on 632 LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " 633 + addr.getHostnamePort(), e); 634 } finally { 635 // Update the totalFailedCount 636 this.totalFailedPutCount.addAndGet(failedCount); 637 } 638 } 639 } 640}