001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.AbstractMap.SimpleEntry;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
047
048/**
049 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put
050 * will be sharded into different buffer queues based on its destination region server. So each
051 * region server buffer queue will only have the puts which share the same destination. And each
052 * queue will have a flush worker thread to flush the puts request to the region server. If any
053 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue.
054 * </p>
055 * Also all the puts will be retried as a configuration number before dropping. And the
056 * HTableMultiplexer can report the number of buffered requests and the number of the failed
057 * (dropped) requests in total or on per region server basis.
058 * <p/>
059 * This class is thread safe.
060 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
061 *             {@link BufferedMutator} for batching mutations.
062 */
063@Deprecated
064@InterfaceAudience.Public
065public class HTableMultiplexer {
066  private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName());
067
068  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
069    "hbase.tablemultiplexer.flush.period.ms";
070  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
071  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
072    "hbase.client.max.retries.in.queue";
073
074  /** The map between each region server to its flush worker */
075  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
076    new ConcurrentHashMap<>();
077
078  private final Configuration conf;
079  private final ClusterConnection conn;
080  private final ExecutorService pool;
081  private final int maxAttempts;
082  private final int perRegionServerBufferQueueSize;
083  private final int maxKeyValueSize;
084  private final ScheduledExecutorService executor;
085  private final long flushPeriod;
086
087  /**
088   * @param conf                           The HBaseConfiguration
089   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
090   *                                       each region server before dropping the request.
091   */
092  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
093    throws IOException {
094    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
095  }
096
097  /**
098   * @param conn                           The HBase connection.
099   * @param conf                           The HBase configuration
100   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
101   *                                       each region server before dropping the request.
102   */
103  public HTableMultiplexer(Connection conn, Configuration conf,
104    int perRegionServerBufferQueueSize) {
105    this.conn = (ClusterConnection) conn;
106    this.pool = HTable.getDefaultExecutor(conf);
107    // how many times we could try in total, one more than retry number
108    this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
109      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
110    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
111    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
112    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
113    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
114    this.executor = Executors.newScheduledThreadPool(initThreads,
115      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
116    this.conf = conf;
117  }
118
119  /**
120   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already been
121   * closed.
122   * @throws IOException If there is an error closing the connection.
123   */
124  public synchronized void close() throws IOException {
125    if (!getConnection().isClosed()) {
126      getConnection().close();
127    }
128  }
129
130  /**
131   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
132   * is already full.
133   * @return true if the request can be accepted by its corresponding buffer queue.
134   */
135  public boolean put(TableName tableName, final Put put) {
136    return put(tableName, put, this.maxAttempts);
137  }
138
139  /**
140   * The puts request will be buffered by their corresponding buffer queue. Return the list of puts
141   * which could not be queued.
142   * @return the list of puts which could not be queued
143   */
144  public List<Put> put(TableName tableName, final List<Put> puts) {
145    if (puts == null) return null;
146
147    List<Put> failedPuts = null;
148    boolean result;
149    for (Put put : puts) {
150      result = put(tableName, put, this.maxAttempts);
151      if (result == false) {
152
153        // Create the failed puts list if necessary
154        if (failedPuts == null) {
155          failedPuts = new ArrayList<>();
156        }
157        // Add the put to the failed puts list
158        failedPuts.add(put);
159      }
160    }
161    return failedPuts;
162  }
163
164  /**
165   * @deprecated Use {@link #put(TableName, List) } instead.
166   */
167  @Deprecated
168  public List<Put> put(byte[] tableName, final List<Put> puts) {
169    return put(TableName.valueOf(tableName), puts);
170  }
171
172  /**
173   * The put request will be buffered by its corresponding buffer queue. And the put request will be
174   * retried before dropping the request. Return false if the queue is already full.
175   * @return true if the request can be accepted by its corresponding buffer queue.
176   */
177  public boolean put(final TableName tableName, final Put put, int maxAttempts) {
178    if (maxAttempts <= 0) {
179      return false;
180    }
181
182    try {
183      ConnectionUtils.validatePut(put, maxKeyValueSize);
184      // Allow mocking to get at the connection, but don't expose the connection to users.
185      ClusterConnection conn = (ClusterConnection) getConnection();
186      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
187      // as necessary. We shouldn't have to do that here.
188      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
189      if (loc != null) {
190        // Add the put pair into its corresponding queue.
191        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
192
193        // Generate a MultiPutStatus object and offer it into the queue
194        PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
195
196        return queue.offer(s);
197      }
198    } catch (IOException e) {
199      LOG.debug("Cannot process the put " + put, e);
200    }
201    return false;
202  }
203
204  /**
205   * @deprecated Use {@link #put(TableName, Put) } instead.
206   */
207  @Deprecated
208  public boolean put(final byte[] tableName, final Put put, int retry) {
209    return put(TableName.valueOf(tableName), put, retry);
210  }
211
212  /**
213   * @deprecated Use {@link #put(TableName, Put)} instead.
214   */
215  @Deprecated
216  public boolean put(final byte[] tableName, Put put) {
217    return put(TableName.valueOf(tableName), put);
218  }
219
220  /** Returns the current HTableMultiplexerStatus */
221  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
222    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
223  }
224
225  @InterfaceAudience.Private
226  @SuppressWarnings("FutureReturnValueIgnored")
227  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
228    FlushWorker worker = serverToFlushWorkerMap.get(addr);
229    if (worker == null) {
230      synchronized (this.serverToFlushWorkerMap) {
231        worker = serverToFlushWorkerMap.get(addr);
232        if (worker == null) {
233          // Create the flush worker
234          worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
235            pool, executor);
236          this.serverToFlushWorkerMap.put(addr, worker);
237          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
238        }
239      }
240    }
241    return worker.getQueue();
242  }
243
244  @InterfaceAudience.Private
245  ClusterConnection getConnection() {
246    return this.conn;
247  }
248
249  /**
250   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the
251   * number of buffered requests and the number of the failed (dropped) requests in total or on per
252   * region server basis.
253   * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
254   *             {@link BufferedMutator} for batching mutations.
255   */
256  @Deprecated
257  @InterfaceAudience.Public
258  public static class HTableMultiplexerStatus {
259    private long totalFailedPutCounter;
260    private long totalBufferedPutCounter;
261    private long maxLatency;
262    private long overallAverageLatency;
263    private Map<String, Long> serverToFailedCounterMap;
264    private Map<String, Long> serverToBufferedCounterMap;
265    private Map<String, Long> serverToAverageLatencyMap;
266    private Map<String, Long> serverToMaxLatencyMap;
267
268    public HTableMultiplexerStatus(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
269      this.totalBufferedPutCounter = 0;
270      this.totalFailedPutCounter = 0;
271      this.maxLatency = 0;
272      this.overallAverageLatency = 0;
273      this.serverToBufferedCounterMap = new HashMap<>();
274      this.serverToFailedCounterMap = new HashMap<>();
275      this.serverToAverageLatencyMap = new HashMap<>();
276      this.serverToMaxLatencyMap = new HashMap<>();
277      this.initialize(serverToFlushWorkerMap);
278    }
279
280    private void initialize(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
281      if (serverToFlushWorkerMap == null) {
282        return;
283      }
284
285      long averageCalcSum = 0;
286      int averageCalcCount = 0;
287      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap.entrySet()) {
288        HRegionLocation addr = entry.getKey();
289        FlushWorker worker = entry.getValue();
290
291        long bufferedCounter = worker.getTotalBufferedCount();
292        long failedCounter = worker.getTotalFailedCount();
293        long serverMaxLatency = worker.getMaxLatency();
294        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
295        // Get sum and count pieces separately to compute overall average
296        SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents();
297        long serverAvgLatency = averageCounter.getAndReset();
298
299        this.totalBufferedPutCounter += bufferedCounter;
300        this.totalFailedPutCounter += failedCounter;
301        if (serverMaxLatency > this.maxLatency) {
302          this.maxLatency = serverMaxLatency;
303        }
304        averageCalcSum += averageComponents.getKey();
305        averageCalcCount += averageComponents.getValue();
306
307        this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter);
308        this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter);
309        this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency);
310        this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency);
311      }
312      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / averageCalcCount : 0;
313    }
314
315    public long getTotalBufferedCounter() {
316      return this.totalBufferedPutCounter;
317    }
318
319    public long getTotalFailedCounter() {
320      return this.totalFailedPutCounter;
321    }
322
323    public long getMaxLatency() {
324      return this.maxLatency;
325    }
326
327    public long getOverallAverageLatency() {
328      return this.overallAverageLatency;
329    }
330
331    public Map<String, Long> getBufferedCounterForEachRegionServer() {
332      return this.serverToBufferedCounterMap;
333    }
334
335    public Map<String, Long> getFailedCounterForEachRegionServer() {
336      return this.serverToFailedCounterMap;
337    }
338
339    public Map<String, Long> getMaxLatencyForEachRegionServer() {
340      return this.serverToMaxLatencyMap;
341    }
342
343    public Map<String, Long> getAverageLatencyForEachRegionServer() {
344      return this.serverToAverageLatencyMap;
345    }
346  }
347
348  @InterfaceAudience.Private
349  static class PutStatus {
350    final RegionInfo regionInfo;
351    final Put put;
352    final int maxAttempCount;
353
354    public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
355      this.regionInfo = regionInfo;
356      this.put = put;
357      this.maxAttempCount = maxAttempCount;
358    }
359  }
360
361  /**
362   * Helper to count the average over an interval until reset.
363   */
364  private static class AtomicAverageCounter {
365    private long sum;
366    private int count;
367
368    public AtomicAverageCounter() {
369      this.sum = 0L;
370      this.count = 0;
371    }
372
373    public synchronized long getAndReset() {
374      long result = this.get();
375      this.reset();
376      return result;
377    }
378
379    public synchronized long get() {
380      if (this.count == 0) {
381        return 0;
382      }
383      return this.sum / this.count;
384    }
385
386    public synchronized SimpleEntry<Long, Integer> getComponents() {
387      return new SimpleEntry<>(sum, count);
388    }
389
390    public synchronized void reset() {
391      this.sum = 0L;
392      this.count = 0;
393    }
394
395    public synchronized void add(long value) {
396      this.sum += value;
397      this.count++;
398    }
399  }
400
401  @InterfaceAudience.Private
402  static class FlushWorker implements Runnable {
403    private final HRegionLocation addr;
404    private final LinkedBlockingQueue<PutStatus> queue;
405    private final HTableMultiplexer multiplexer;
406    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
407    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
408    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
409    private final AtomicLong maxLatency = new AtomicLong(0);
410
411    private final AsyncProcess ap;
412    private final List<PutStatus> processingList = new ArrayList<>();
413    private final ScheduledExecutorService executor;
414    private final int maxRetryInQueue;
415    private final AtomicInteger retryInQueue = new AtomicInteger(0);
416    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
417    private final int operationTimeout;
418    private final ExecutorService pool;
419
420    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
421      HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool,
422      ScheduledExecutorService executor) {
423      this.addr = addr;
424      this.multiplexer = htableMultiplexer;
425      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
426      final ConnectionConfiguration connectionConfig =
427        conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf);
428      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
429        connectionConfig, conn == null ? null : conn.getConnectionMetrics());
430      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
431      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
432        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
433      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
434        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
435      // Specify 0 retries in AsyncProcess because we need to reassign puts to different queues
436      // if regions are moved.
437      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0);
438      this.executor = executor;
439      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
440      this.pool = pool;
441    }
442
443    protected LinkedBlockingQueue<PutStatus> getQueue() {
444      return this.queue;
445    }
446
447    public long getTotalFailedCount() {
448      return totalFailedPutCount.get();
449    }
450
451    public long getTotalBufferedCount() {
452      return (long) queue.size() + currentProcessingCount.get();
453    }
454
455    public AtomicAverageCounter getAverageLatencyCounter() {
456      return this.averageLatency;
457    }
458
459    public long getMaxLatency() {
460      return this.maxLatency.getAndSet(0);
461    }
462
463    @SuppressWarnings("FutureReturnValueIgnored")
464    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
465      // Decrease the retry count
466      final int retryCount = ps.maxAttempCount - 1;
467
468      if (retryCount <= 0) {
469        // Update the failed counter and no retry any more.
470        return false;
471      }
472
473      int cnt = getRetryInQueue().incrementAndGet();
474      if (cnt > getMaxRetryInQueue()) {
475        // Too many Puts in queue for resubmit, give up this
476        getRetryInQueue().decrementAndGet();
477        return false;
478      }
479
480      final Put failedPut = ps.put;
481      // The currentPut is failed. So get the table name for the currentPut.
482      final TableName tableName = ps.regionInfo.getTable();
483
484      long delayMs = getNextDelay(retryCount);
485      if (LOG.isDebugEnabled()) {
486        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
487      }
488
489      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
490      // the region location cache when the Put original failed with some exception. If we keep
491      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
492      // that we expect it to.
493      getExecutor().schedule(new Runnable() {
494        @Override
495        public void run() {
496          boolean succ = false;
497          try {
498            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
499          } finally {
500            FlushWorker.this.getRetryInQueue().decrementAndGet();
501            if (!succ) {
502              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
503            }
504          }
505        }
506      }, delayMs, TimeUnit.MILLISECONDS);
507      return true;
508    }
509
510    @InterfaceAudience.Private
511    long getNextDelay(int retryCount) {
512      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
513        multiplexer.maxAttempts - retryCount - 1);
514    }
515
516    @InterfaceAudience.Private
517    AtomicInteger getRetryInQueue() {
518      return this.retryInQueue;
519    }
520
521    @InterfaceAudience.Private
522    int getMaxRetryInQueue() {
523      return this.maxRetryInQueue;
524    }
525
526    @InterfaceAudience.Private
527    AtomicLong getTotalFailedPutCount() {
528      return this.totalFailedPutCount;
529    }
530
531    @InterfaceAudience.Private
532    HTableMultiplexer getMultiplexer() {
533      return this.multiplexer;
534    }
535
536    @InterfaceAudience.Private
537    ScheduledExecutorService getExecutor() {
538      return this.executor;
539    }
540
541    @Override
542    public void run() {
543      int failedCount = 0;
544      try {
545        long start = EnvironmentEdgeManager.currentTime();
546
547        // drain all the queued puts into the tmp list
548        processingList.clear();
549        queue.drainTo(processingList);
550        if (processingList.isEmpty()) {
551          // Nothing to flush
552          return;
553        }
554
555        currentProcessingCount.set(processingList.size());
556        // failedCount is decreased whenever a Put is success or resubmit.
557        failedCount = processingList.size();
558
559        List<Action> retainedActions = new ArrayList<>(processingList.size());
560        MultiAction actions = new MultiAction();
561        for (int i = 0; i < processingList.size(); i++) {
562          PutStatus putStatus = processingList.get(i);
563          Action action = new Action(putStatus.put, i);
564          actions.add(putStatus.regionInfo.getRegionName(), action);
565          retainedActions.add(action);
566        }
567
568        // Process this multi-put request
569        List<PutStatus> failed = null;
570        Object[] results = new Object[actions.size()];
571        ServerName server = addr.getServerName();
572        Map<ServerName, MultiAction> actionsByServer = Collections.singletonMap(server, actions);
573        try {
574          AsyncProcessTask task = AsyncProcessTask.newBuilder().setResults(results).setPool(pool)
575            .setRpcTimeout(writeRpcTimeout).setOperationTimeout(operationTimeout).build();
576          AsyncRequestFuture arf =
577            ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
578          arf.waitUntilDone();
579          if (arf.hasError()) {
580            // We just log and ignore the exception here since failed Puts will be resubmit again.
581            LOG.debug("Caught some exceptions when flushing puts to region server "
582              + addr.getHostnamePort(), arf.getErrors());
583          }
584        } finally {
585          for (int i = 0; i < results.length; i++) {
586            if (results[i] instanceof Result) {
587              failedCount--;
588            } else {
589              if (failed == null) {
590                failed = new ArrayList<>();
591              }
592              failed.add(processingList.get(i));
593            }
594          }
595        }
596
597        if (failed != null) {
598          // Resubmit failed puts
599          for (PutStatus putStatus : failed) {
600            if (resubmitFailedPut(putStatus, this.addr)) {
601              failedCount--;
602            }
603          }
604        }
605
606        long elapsed = EnvironmentEdgeManager.currentTime() - start;
607        // Update latency counters
608        averageLatency.add(elapsed);
609        if (elapsed > maxLatency.get()) {
610          maxLatency.set(elapsed);
611        }
612
613        // Log some basic info
614        if (LOG.isDebugEnabled()) {
615          LOG.debug(
616            "Processed " + currentProcessingCount + " put requests for " + addr.getHostnamePort()
617              + " and " + failedCount + " failed" + ", latency for this send: " + elapsed);
618        }
619
620        // Reset the current processing put count
621        currentProcessingCount.set(0);
622      } catch (RuntimeException e) {
623        // To make findbugs happy
624        // Log all the exceptions and move on
625        LOG.debug("Caught some exceptions " + e + " when flushing puts to region server "
626          + addr.getHostnamePort(), e);
627      } catch (Exception e) {
628        if (e instanceof InterruptedException) {
629          Thread.currentThread().interrupt();
630        }
631        // Log all the exceptions and move on
632        LOG.debug("Caught some exceptions " + e + " when flushing puts to region server "
633          + addr.getHostnamePort(), e);
634      } finally {
635        // Update the totalFailedCount
636        this.totalFailedPutCount.addAndGet(failedCount);
637      }
638    }
639  }
640}