001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
028
029import java.io.ByteArrayInputStream;
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.TreeMap;
039import java.util.UUID;
040import java.util.concurrent.Future;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellScanner;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.TableNotFoundException;
053import org.apache.hadoop.hbase.client.AsyncConnection;
054import org.apache.hadoop.hbase.client.AsyncTable;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionFactory;
057import org.apache.hadoop.hbase.client.Delete;
058import org.apache.hadoop.hbase.client.Mutation;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RetriesExhaustedException;
061import org.apache.hadoop.hbase.client.Row;
062import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.FutureUtils;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
078
079/**
080 * <p>
081 * This class is responsible for replicating the edits coming from another cluster.
082 * </p>
083 * <p>
084 * This replication process is currently waiting for the edits to be applied before the method can
085 * return. This means that the replication of edits is synchronized (after reading from WALs in
086 * ReplicationSource) and that a single region server cannot receive edits from two sources at the
087 * same time
088 * </p>
089 * <p>
090 * This class uses the native HBase client in order to replicate entries.
091 * </p>
092 * TODO make this class more like ReplicationSource wrt log handling
093 */
094@InterfaceAudience.Private
095public class ReplicationSink {
096
097  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
098  private final Configuration conf;
099  // Volatile because of note in here -- look for double-checked locking:
100  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
101  /**
102   * This shared {@link Connection} is used for handling bulk load hfiles replication.
103   */
104  private volatile Connection sharedConnection;
105  /**
106   * This shared {@link AsyncConnection} is used for handling wal replication.
107   */
108  private volatile AsyncConnection sharedAsyncConnection;
109  private final MetricsSink metrics;
110  private final AtomicLong totalReplicatedEdits = new AtomicLong();
111  private final Object sharedConnectionLock = new Object();
112  private final Object sharedAsyncConnectionLock = new Object();
113  // Number of hfiles that we successfully replicated
114  private long hfilesReplicated = 0;
115  private SourceFSConfigurationProvider provider;
116  private WALEntrySinkFilter walEntrySinkFilter;
117
118  /**
119   * Row size threshold for multi requests above which a warning is logged
120   */
121  private final int rowSizeWarnThreshold;
122  private boolean replicationSinkTrackerEnabled;
123
124  private final RegionServerCoprocessorHost rsServerHost;
125
126  /**
127   * Create a sink for replication
128   * @param conf conf object
129   * @throws IOException thrown when HDFS goes bad or bad file name
130   */
131  public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
132    throws IOException {
133    this.conf = HBaseConfiguration.create(conf);
134    this.rsServerHost = rsServerHost;
135    rowSizeWarnThreshold =
136      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
137    replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
138      REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
139    decorateConf();
140    this.metrics = new MetricsSink();
141    this.walEntrySinkFilter = setupWALEntrySinkFilter();
142    String className = conf.get("hbase.replication.source.fs.conf.provider",
143      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
144    try {
145      @SuppressWarnings("rawtypes")
146      Class c = Class.forName(className);
147      this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
148    } catch (Exception e) {
149      throw new IllegalArgumentException(
150        "Configured source fs configuration provider class " + className + " throws error.", e);
151    }
152  }
153
154  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
155    Class<?> walEntryFilterClass =
156      this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
157    WALEntrySinkFilter filter = null;
158    try {
159      filter = walEntryFilterClass == null
160        ? null
161        : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance();
162    } catch (Exception e) {
163      LOG.warn("Failed to instantiate " + walEntryFilterClass);
164    }
165    if (filter != null) {
166      filter.init(getConnection());
167    }
168    return filter;
169  }
170
171  /**
172   * decorate the Configuration object to make replication more receptive to delays: lessen the
173   * timeout and numTries.
174   */
175  private void decorateConf() {
176    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
177      this.conf.getInt("replication.sink.client.retries.number", 4));
178    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
179      this.conf.getInt("replication.sink.client.ops.timeout", 10000));
180    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
181    if (StringUtils.isNotEmpty(replicationCodec)) {
182      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
183    }
184    // use server ZK cluster for replication, so we unset the client ZK related properties if any
185    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
186      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
187    }
188  }
189
190  /**
191   * Replicate this array of entries directly into the local cluster using the native client. Only
192   * operates against raw protobuf type saving on a conversion from pb to pojo.
193   * @param entries                    WAL entries to be replicated.
194   * @param cells                      cell scanner for iteration.
195   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
196   *                                   configurations in the replication configuration directory
197   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
198   *                                   directory
199   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
200   * @throws IOException If failed to replicate the data
201   */
202  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
203    String replicationClusterId, String sourceBaseNamespaceDirPath,
204    String sourceHFileArchiveDirPath) throws IOException {
205    if (entries.isEmpty()) return;
206    // Very simple optimization where we batch sequences of rows going
207    // to the same table.
208    try {
209      long totalReplicated = 0;
210      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
211      // invocation of this method per table and cluster id.
212      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
213
214      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
215      Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
216        new Pair<>(new ArrayList<>(), new ArrayList<>());
217      for (WALEntry entry : entries) {
218        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
219        if (this.walEntrySinkFilter != null) {
220          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
221            // Skip Cells in CellScanner associated with this entry.
222            int count = entry.getAssociatedCellCount();
223            for (int i = 0; i < count; i++) {
224              // Throw index out of bounds if our cell count is off
225              if (!cells.advance()) {
226                this.metrics.incrementFailedBatches();
227                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
228              }
229            }
230            continue;
231          }
232        }
233        Cell previousCell = null;
234        Mutation mutation = null;
235        int count = entry.getAssociatedCellCount();
236        for (int i = 0; i < count; i++) {
237          // Throw index out of bounds if our cell count is off
238          if (!cells.advance()) {
239            this.metrics.incrementFailedBatches();
240            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
241          }
242          Cell cell = cells.current();
243          // Handle bulk load hfiles replication
244          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
245            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
246            if (bld.getReplicate()) {
247              if (bulkLoadsPerClusters == null) {
248                bulkLoadsPerClusters = new HashMap<>();
249              }
250              // Map of table name Vs list of pair of family and list of
251              // hfile paths from its namespace
252              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
253                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
254              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
255            }
256          } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
257            Mutation put = processReplicationMarkerEntry(cell);
258            if (put == null) {
259              continue;
260            }
261            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
262            List<UUID> clusterIds = new ArrayList<>();
263            for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
264              clusterIds.add(toUUID(clusterId));
265            }
266            put.setClusterIds(clusterIds);
267            addToHashMultiMap(rowMap, table, clusterIds, put);
268          } else {
269            // Handle wal replication
270            if (isNewRowOrType(previousCell, cell)) {
271              // Create new mutation
272              mutation = CellUtil.isDelete(cell)
273                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
274                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
275              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
276              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
277                clusterIds.add(toUUID(clusterId));
278              }
279              mutation.setClusterIds(clusterIds);
280              if (rsServerHost != null) {
281                rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
282                mutationsToWalEntriesPairs.getFirst().add(mutation);
283                mutationsToWalEntriesPairs.getSecond().add(entry);
284              }
285              addToHashMultiMap(rowMap, table, clusterIds, mutation);
286            }
287            if (CellUtil.isDelete(cell)) {
288              ((Delete) mutation).add(cell);
289            } else {
290              ((Put) mutation).add(cell);
291            }
292            previousCell = cell;
293          }
294        }
295        totalReplicated++;
296      }
297
298      // TODO Replicating mutations and bulk loaded data can be made parallel
299      if (!rowMap.isEmpty()) {
300        LOG.debug("Started replicating mutations.");
301        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
302          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
303        }
304        LOG.debug("Finished replicating mutations.");
305      }
306
307      if (rsServerHost != null) {
308        List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
309        List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
310        for (int i = 0; i < mutations.size(); i++) {
311          rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
312        }
313      }
314
315      if (bulkLoadsPerClusters != null) {
316        for (Entry<List<String>,
317          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
318          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
319          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
320            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
321            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
322            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
323              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
324              getConnection(), entry.getKey())) {
325              hFileReplicator.replicate();
326              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
327            }
328          }
329        }
330      }
331
332      int size = entries.size();
333      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
334      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
335      this.totalReplicatedEdits.addAndGet(totalReplicated);
336    } catch (IOException ex) {
337      LOG.error("Unable to accept edit because:", ex);
338      this.metrics.incrementFailedBatches();
339      throw ex;
340    }
341  }
342
343  /*
344   * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
345   * If false, then ignore this cell. If set to true, de-serialize value into
346   * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
347   * timestamp from ReplicationMarkerDescriptor.
348   */
349  private Put processReplicationMarkerEntry(Cell cell) throws IOException {
350    // If source is emitting replication marker rows but sink is not accepting them,
351    // ignore the edits.
352    if (!replicationSinkTrackerEnabled) {
353      return null;
354    }
355    WALProtos.ReplicationMarkerDescriptor descriptor =
356      WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
357        cell.getValueOffset(), cell.getValueLength()));
358    Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
359    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
360      (Bytes.toBytes(descriptor.getRegionServerName())));
361    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
362      Bytes.toBytes(descriptor.getWalName()));
363    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
364      Bytes.toBytes(cell.getTimestamp()));
365    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
366      Bytes.toBytes(descriptor.getOffset()));
367    return put;
368  }
369
370  private void buildBulkLoadHFileMap(
371    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
372    BulkLoadDescriptor bld) throws IOException {
373    List<StoreDescriptor> storesList = bld.getStoresList();
374    int storesSize = storesList.size();
375    for (int j = 0; j < storesSize; j++) {
376      StoreDescriptor storeDescriptor = storesList.get(j);
377      List<String> storeFileList = storeDescriptor.getStoreFileList();
378      int storeFilesSize = storeFileList.size();
379      hfilesReplicated += storeFilesSize;
380      for (int k = 0; k < storeFilesSize; k++) {
381        byte[] family = storeDescriptor.getFamilyName().toByteArray();
382
383        // Build hfile relative path from its namespace
384        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
385        String tableName = table.getNameWithNamespaceInclAsString();
386        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
387        if (familyHFilePathsList != null) {
388          boolean foundFamily = false;
389          for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
390            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
391              // Found family already present, just add the path to the existing list
392              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
393              foundFamily = true;
394              break;
395            }
396          }
397          if (!foundFamily) {
398            // Family not found, add this family and its hfile paths pair to the list
399            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
400          }
401        } else {
402          // Add this table entry into the map
403          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
404        }
405      }
406    }
407  }
408
409  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
410    List<Pair<byte[], List<String>>> familyHFilePathsList) {
411    List<String> hfilePaths = new ArrayList<>(1);
412    hfilePaths.add(pathToHfileFromNS);
413    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
414  }
415
416  private void addNewTableEntryInMap(
417    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
418    String pathToHfileFromNS, String tableName) {
419    List<String> hfilePaths = new ArrayList<>(1);
420    hfilePaths.add(pathToHfileFromNS);
421    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
422    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
423    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
424    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
425  }
426
427  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
428    byte[] family) {
429    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
430      .append(table.getQualifierAsString()).append(Path.SEPARATOR)
431      .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
432      .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
433  }
434
435  /** Returns True if we have crossed over onto a new row or type */
436  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
437    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
438      || !CellUtil.matchingRows(previousCell, cell);
439  }
440
441  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
442    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
443  }
444
445  /**
446   * Simple helper to a map from key to (a list of) values TODO: Make a general utility method *
447   * * @return the list of values corresponding to key1 and key2
448   */
449  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
450    V value) {
451    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
452    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
453    values.add(value);
454    return values;
455  }
456
457  /**
458   * stop the thread pool executor. It is called when the regionserver is stopped.
459   */
460  public void stopReplicationSinkServices() {
461    try {
462      if (this.sharedConnection != null) {
463        synchronized (sharedConnectionLock) {
464          if (this.sharedConnection != null) {
465            this.sharedConnection.close();
466            this.sharedConnection = null;
467          }
468        }
469      }
470    } catch (IOException e) {
471      // ignoring as we are closing.
472      LOG.warn("IOException while closing the sharedConnection", e);
473    }
474
475    try {
476      if (this.sharedAsyncConnection != null) {
477        synchronized (sharedAsyncConnectionLock) {
478          if (this.sharedAsyncConnection != null) {
479            this.sharedAsyncConnection.close();
480            this.sharedAsyncConnection = null;
481          }
482        }
483      }
484    } catch (IOException e) {
485      // ignoring as we are closing.
486      LOG.warn("IOException while closing the sharedAsyncConnection", e);
487    }
488  }
489
490  /**
491   * Do the changes and handle the pool
492   * @param tableName             table to insert into
493   * @param allRows               list of actions
494   * @param batchRowSizeThreshold rowSize threshold for batch mutation
495   */
496  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
497    throws IOException {
498    if (allRows.isEmpty()) {
499      return;
500    }
501    AsyncTable<?> table = getAsyncConnection().getTable(tableName);
502    List<Future<?>> futures = new ArrayList<>();
503    for (List<Row> rows : allRows) {
504      List<List<Row>> batchRows;
505      if (rows.size() > batchRowSizeThreshold) {
506        batchRows = Lists.partition(rows, batchRowSizeThreshold);
507      } else {
508        batchRows = Collections.singletonList(rows);
509      }
510      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
511    }
512
513    for (Future<?> future : futures) {
514      try {
515        FutureUtils.get(future);
516      } catch (RetriesExhaustedException e) {
517        if (e.getCause() instanceof TableNotFoundException) {
518          throw new TableNotFoundException("'" + tableName + "'");
519        }
520        throw e;
521      }
522    }
523  }
524
525  /**
526   * Return the shared {@link Connection} which is used for handling bulk load hfiles replication.
527   */
528  private Connection getConnection() throws IOException {
529    // See https://en.wikipedia.org/wiki/Double-checked_locking
530    Connection connection = sharedConnection;
531    if (connection == null) {
532      synchronized (sharedConnectionLock) {
533        connection = sharedConnection;
534        if (connection == null) {
535          connection = ConnectionFactory.createConnection(conf);
536          sharedConnection = connection;
537        }
538      }
539    }
540    return connection;
541  }
542
543  /**
544   * Return the shared {@link AsyncConnection} which is used for handling wal replication.
545   */
546  private AsyncConnection getAsyncConnection() throws IOException {
547    // See https://en.wikipedia.org/wiki/Double-checked_locking
548    AsyncConnection asyncConnection = sharedAsyncConnection;
549    if (asyncConnection == null) {
550      synchronized (sharedAsyncConnectionLock) {
551        asyncConnection = sharedAsyncConnection;
552        if (asyncConnection == null) {
553          /**
554           * Get the AsyncConnection immediately.
555           */
556          asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf));
557          sharedAsyncConnection = asyncConnection;
558        }
559      }
560    }
561    return asyncConnection;
562  }
563
564  /**
565   * Get a string representation of this sink's metrics
566   * @return string with the total replicated edits count and the date of the last edit that was
567   *         applied
568   */
569  public String getStats() {
570    return this.totalReplicatedEdits.get() == 0
571      ? ""
572      : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp()
573        + ", total replicated edits: " + this.totalReplicatedEdits;
574  }
575
576  /**
577   * Get replication Sink Metrics
578   */
579  public MetricsSink getSinkMetrics() {
580    return this.metrics;
581  }
582}