001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
028
029import java.io.ByteArrayInputStream;
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.TreeMap;
039import java.util.UUID;
040import java.util.concurrent.Future;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.ExtendedCell;
049import org.apache.hadoop.hbase.ExtendedCellScanner;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.TableNotFoundException;
054import org.apache.hadoop.hbase.client.AsyncClusterConnection;
055import org.apache.hadoop.hbase.client.AsyncTable;
056import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
057import org.apache.hadoop.hbase.client.Delete;
058import org.apache.hadoop.hbase.client.Mutation;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RetriesExhaustedException;
061import org.apache.hadoop.hbase.client.Row;
062import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
063import org.apache.hadoop.hbase.replication.ReplicationUtils;
064import org.apache.hadoop.hbase.security.UserProvider;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.FutureUtils;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
074
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
080
081/**
082 * <p>
083 * This class is responsible for replicating the edits coming from another cluster.
084 * </p>
085 * <p>
086 * This replication process is currently waiting for the edits to be applied before the method can
087 * return. This means that the replication of edits is synchronized (after reading from WALs in
088 * ReplicationSource) and that a single region server cannot receive edits from two sources at the
089 * same time
090 * </p>
091 * <p>
092 * This class uses the native HBase client in order to replicate entries.
093 * </p>
094 * TODO make this class more like ReplicationSource wrt log handling
095 */
096@InterfaceAudience.Private
097public class ReplicationSink {
098
099  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
100  private final Configuration conf;
101  // Volatile because of note in here -- look for double-checked locking:
102  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
103  private volatile AsyncClusterConnection sharedConn;
104  private final MetricsSink metrics;
105  private final AtomicLong totalReplicatedEdits = new AtomicLong();
106  private final Object sharedConnLock = new Object();
107  // Number of hfiles that we successfully replicated
108  private long hfilesReplicated = 0;
109  private SourceFSConfigurationProvider provider;
110  private WALEntrySinkFilter walEntrySinkFilter;
111
112  /**
113   * Row size threshold for multi requests above which a warning is logged
114   */
115  private final int rowSizeWarnThreshold;
116  private boolean replicationSinkTrackerEnabled;
117
118  private final RegionServerCoprocessorHost rsServerHost;
119
120  /**
121   * Create a sink for replication
122   * @param conf conf object
123   * @throws IOException thrown when HDFS goes bad or bad file name
124   */
125  public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
126    throws IOException {
127    this.conf = HBaseConfiguration.create(conf);
128    this.rsServerHost = rsServerHost;
129    rowSizeWarnThreshold =
130      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
131    replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
132      REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
133    decorateConf();
134    this.metrics = new MetricsSink();
135    this.walEntrySinkFilter = setupWALEntrySinkFilter();
136    String className = conf.get("hbase.replication.source.fs.conf.provider",
137      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
138    try {
139      Class<? extends SourceFSConfigurationProvider> c =
140        Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
141      this.provider = c.getDeclaredConstructor().newInstance();
142    } catch (Exception e) {
143      throw new IllegalArgumentException(
144        "Configured source fs configuration provider class " + className + " throws error.", e);
145    }
146  }
147
148  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
149    Class<?> walEntryFilterClass =
150      this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
151    WALEntrySinkFilter filter = null;
152    try {
153      filter = walEntryFilterClass == null
154        ? null
155        : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance();
156    } catch (Exception e) {
157      LOG.warn("Failed to instantiate " + walEntryFilterClass);
158    }
159    if (filter != null) {
160      filter.init(getConnection());
161    }
162    return filter;
163  }
164
165  /**
166   * decorate the Configuration object to make replication more receptive to delays: lessen the
167   * timeout and numTries.
168   */
169  private void decorateConf() {
170    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
171      this.conf.getInt("replication.sink.client.retries.number", 4));
172    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
173      this.conf.getInt("replication.sink.client.ops.timeout", 10000));
174    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
175    if (StringUtils.isNotEmpty(replicationCodec)) {
176      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
177    }
178    // use server ZK cluster for replication, so we unset the client ZK related properties if any
179    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
180      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
181    }
182  }
183
184  /**
185   * Replicate this array of entries directly into the local cluster using the native client. Only
186   * operates against raw protobuf type saving on a conversion from pb to pojo.
187   * @param entries                    WAL entries to be replicated.
188   * @param cells                      cell scanner for iteration.
189   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
190   *                                   configurations in the replication configuration directory
191   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
192   *                                   directory
193   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
194   * @throws IOException If failed to replicate the data
195   */
196  public void replicateEntries(List<WALEntry> entries, final ExtendedCellScanner cells,
197    String replicationClusterId, String sourceBaseNamespaceDirPath,
198    String sourceHFileArchiveDirPath) throws IOException {
199    if (entries.isEmpty()) {
200      return;
201    }
202    // Very simple optimization where we batch sequences of rows going
203    // to the same table.
204    try {
205      long totalReplicated = 0;
206      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
207      // invocation of this method per table and cluster id.
208      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
209
210      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
211      Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
212        new Pair<>(new ArrayList<>(), new ArrayList<>());
213      for (WALEntry entry : entries) {
214        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
215        if (this.walEntrySinkFilter != null) {
216          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
217            // Skip Cells in CellScanner associated with this entry.
218            int count = entry.getAssociatedCellCount();
219            for (int i = 0; i < count; i++) {
220              // Throw index out of bounds if our cell count is off
221              if (!cells.advance()) {
222                this.metrics.incrementFailedBatches();
223                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
224              }
225            }
226            continue;
227          }
228        }
229        ExtendedCell previousCell = null;
230        Mutation mutation = null;
231        int count = entry.getAssociatedCellCount();
232        for (int i = 0; i < count; i++) {
233          // Throw index out of bounds if our cell count is off
234          if (!cells.advance()) {
235            this.metrics.incrementFailedBatches();
236            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
237          }
238          ExtendedCell cell = cells.current();
239          // Handle bulk load hfiles replication
240          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
241            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
242            if (bld.getReplicate()) {
243              if (bulkLoadsPerClusters == null) {
244                bulkLoadsPerClusters = new HashMap<>();
245              }
246              // Map of table name Vs list of pair of family and list of
247              // hfile paths from its namespace
248              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
249                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
250              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
251            }
252          } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
253            Mutation put = processReplicationMarkerEntry(cell);
254            if (put == null) {
255              continue;
256            }
257            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
258            List<UUID> clusterIds = new ArrayList<>();
259            for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
260              clusterIds.add(toUUID(clusterId));
261            }
262            put.setClusterIds(clusterIds);
263            addToHashMultiMap(rowMap, table, clusterIds, put);
264          } else {
265            // Handle wal replication
266            if (isNewRowOrType(previousCell, cell)) {
267              // Create new mutation
268              mutation = CellUtil.isDelete(cell)
269                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
270                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
271              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
272              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
273                clusterIds.add(toUUID(clusterId));
274              }
275              mutation.setClusterIds(clusterIds);
276              mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
277                HConstants.EMPTY_BYTE_ARRAY);
278              if (rsServerHost != null) {
279                rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
280                mutationsToWalEntriesPairs.getFirst().add(mutation);
281                mutationsToWalEntriesPairs.getSecond().add(entry);
282              }
283              addToHashMultiMap(rowMap, table, clusterIds, mutation);
284            }
285            if (CellUtil.isDelete(cell)) {
286              ((Delete) mutation).add(cell);
287            } else {
288              ((Put) mutation).add(cell);
289            }
290            previousCell = cell;
291          }
292        }
293        totalReplicated++;
294      }
295
296      // TODO Replicating mutations and bulk loaded data can be made parallel
297      if (!rowMap.isEmpty()) {
298        LOG.debug("Started replicating mutations.");
299        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
300          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
301        }
302        LOG.debug("Finished replicating mutations.");
303      }
304
305      if (rsServerHost != null) {
306        List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
307        List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
308        for (int i = 0; i < mutations.size(); i++) {
309          rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
310        }
311      }
312
313      if (bulkLoadsPerClusters != null) {
314        for (Entry<List<String>,
315          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
316          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
317          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
318            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
319            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
320            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
321              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
322              getConnection(), entry.getKey())) {
323              hFileReplicator.replicate();
324              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
325            }
326          }
327        }
328      }
329
330      int size = entries.size();
331      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
332      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
333      this.totalReplicatedEdits.addAndGet(totalReplicated);
334    } catch (IOException ex) {
335      LOG.error("Unable to accept edit because:", ex);
336      this.metrics.incrementFailedBatches();
337      throw ex;
338    }
339  }
340
341  /*
342   * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
343   * If false, then ignore this cell. If set to true, de-serialize value into
344   * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
345   * timestamp from ReplicationMarkerDescriptor.
346   */
347  private Put processReplicationMarkerEntry(Cell cell) throws IOException {
348    // If source is emitting replication marker rows but sink is not accepting them,
349    // ignore the edits.
350    if (!replicationSinkTrackerEnabled) {
351      return null;
352    }
353    WALProtos.ReplicationMarkerDescriptor descriptor =
354      WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
355        cell.getValueOffset(), cell.getValueLength()));
356    Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
357    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
358      (Bytes.toBytes(descriptor.getRegionServerName())));
359    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
360      Bytes.toBytes(descriptor.getWalName()));
361    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
362      Bytes.toBytes(cell.getTimestamp()));
363    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
364      Bytes.toBytes(descriptor.getOffset()));
365    return put;
366  }
367
368  private void buildBulkLoadHFileMap(
369    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
370    BulkLoadDescriptor bld) throws IOException {
371    List<StoreDescriptor> storesList = bld.getStoresList();
372    int storesSize = storesList.size();
373    for (int j = 0; j < storesSize; j++) {
374      StoreDescriptor storeDescriptor = storesList.get(j);
375      List<String> storeFileList = storeDescriptor.getStoreFileList();
376      int storeFilesSize = storeFileList.size();
377      hfilesReplicated += storeFilesSize;
378      for (int k = 0; k < storeFilesSize; k++) {
379        byte[] family = storeDescriptor.getFamilyName().toByteArray();
380
381        // Build hfile relative path from its namespace
382        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
383        String tableName = table.getNameWithNamespaceInclAsString();
384        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
385        if (familyHFilePathsList != null) {
386          boolean foundFamily = false;
387          for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
388            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
389              // Found family already present, just add the path to the existing list
390              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
391              foundFamily = true;
392              break;
393            }
394          }
395          if (!foundFamily) {
396            // Family not found, add this family and its hfile paths pair to the list
397            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
398          }
399        } else {
400          // Add this table entry into the map
401          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
402        }
403      }
404    }
405  }
406
407  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
408    List<Pair<byte[], List<String>>> familyHFilePathsList) {
409    List<String> hfilePaths = new ArrayList<>(1);
410    hfilePaths.add(pathToHfileFromNS);
411    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
412  }
413
414  private void addNewTableEntryInMap(
415    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
416    String pathToHfileFromNS, String tableName) {
417    List<String> hfilePaths = new ArrayList<>(1);
418    hfilePaths.add(pathToHfileFromNS);
419    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
420    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
421    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
422    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
423  }
424
425  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
426    byte[] family) {
427    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
428      .append(table.getQualifierAsString()).append(Path.SEPARATOR)
429      .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
430      .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
431  }
432
433  /** Returns True if we have crossed over onto a new row or type */
434  private boolean isNewRowOrType(final ExtendedCell previousCell, final ExtendedCell cell) {
435    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
436      || !CellUtil.matchingRows(previousCell, cell);
437  }
438
439  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
440    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
441  }
442
443  /**
444   * Simple helper to a map from key to (a list of) values TODO: Make a general utility method
445   * @return the list of values corresponding to key1 and key2
446   */
447  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
448    V value) {
449    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
450    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
451    values.add(value);
452    return values;
453  }
454
455  /**
456   * stop the thread pool executor. It is called when the regionserver is stopped.
457   */
458  public void stopReplicationSinkServices() {
459    try {
460      if (this.sharedConn != null) {
461        synchronized (sharedConnLock) {
462          if (this.sharedConn != null) {
463            this.sharedConn.close();
464            this.sharedConn = null;
465          }
466        }
467      }
468    } catch (IOException e) {
469      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
470    }
471  }
472
473  /**
474   * Do the changes and handle the pool
475   * @param tableName             table to insert into
476   * @param allRows               list of actions
477   * @param batchRowSizeThreshold rowSize threshold for batch mutation
478   */
479  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
480    throws IOException {
481    if (allRows.isEmpty()) {
482      return;
483    }
484    AsyncTable<?> table = getConnection().getTable(tableName);
485    List<Future<?>> futures = new ArrayList<>();
486    for (List<Row> rows : allRows) {
487      List<List<Row>> batchRows;
488      if (rows.size() > batchRowSizeThreshold) {
489        batchRows = Lists.partition(rows, batchRowSizeThreshold);
490      } else {
491        batchRows = Collections.singletonList(rows);
492      }
493      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
494    }
495    // Here we will always wait until all futures are finished, even if there are failures when
496    // getting from a future in the middle. This is because this method may be called in a rpc call,
497    // so the batch operations may reference some off heap cells(through CellScanner). If we return
498    // earlier here, the rpc call may be finished and they will release the off heap cells before
499    // some of the batch operations finish, and then cause corrupt data or even crash the region
500    // server. See HBASE-28584 and HBASE-28850 for more details.
501    IOException error = null;
502    for (Future<?> future : futures) {
503      try {
504        FutureUtils.get(future);
505      } catch (RetriesExhaustedException e) {
506        IOException ioe;
507        if (e.getCause() instanceof TableNotFoundException) {
508          ioe = new TableNotFoundException("'" + tableName + "'");
509        } else {
510          ioe = e;
511        }
512        if (error == null) {
513          error = ioe;
514        } else {
515          error.addSuppressed(ioe);
516        }
517      }
518    }
519    if (error != null) {
520      throw error;
521    }
522  }
523
524  private AsyncClusterConnection getConnection() throws IOException {
525    // See https://en.wikipedia.org/wiki/Double-checked_locking
526    AsyncClusterConnection connection = sharedConn;
527    if (connection == null) {
528      synchronized (sharedConnLock) {
529        connection = sharedConn;
530        if (connection == null) {
531          connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
532            UserProvider.instantiate(conf).getCurrent());
533          sharedConn = connection;
534        }
535      }
536    }
537    return connection;
538  }
539
540  /**
541   * Get a string representation of this sink's metrics
542   * @return string with the total replicated edits count and the date of the last edit that was
543   *         applied
544   */
545  public String getStats() {
546    long total = this.totalReplicatedEdits.get();
547    return total == 0
548      ? ""
549      : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp()
550        + ", total replicated edits: " + total;
551  }
552
553  /**
554   * Get replication Sink Metrics
555   */
556  public MetricsSink getSinkMetrics() {
557    return this.metrics;
558  }
559}