001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; 021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; 022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; 023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; 024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; 026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; 027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; 028 029import java.io.ByteArrayInputStream; 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.Future; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellScanner; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.TableNotFoundException; 053import org.apache.hadoop.hbase.client.AsyncConnection; 054import org.apache.hadoop.hbase.client.AsyncTable; 055import org.apache.hadoop.hbase.client.Connection; 056import org.apache.hadoop.hbase.client.ConnectionFactory; 057import org.apache.hadoop.hbase.client.Delete; 058import org.apache.hadoop.hbase.client.Mutation; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RetriesExhaustedException; 061import org.apache.hadoop.hbase.client.Row; 062import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.FutureUtils; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 072 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 078 079/** 080 * <p> 081 * This class is responsible for replicating the edits coming from another cluster. 082 * </p> 083 * <p> 084 * This replication process is currently waiting for the edits to be applied before the method can 085 * return. This means that the replication of edits is synchronized (after reading from WALs in 086 * ReplicationSource) and that a single region server cannot receive edits from two sources at the 087 * same time 088 * </p> 089 * <p> 090 * This class uses the native HBase client in order to replicate entries. 091 * </p> 092 * TODO make this class more like ReplicationSource wrt log handling 093 */ 094@InterfaceAudience.Private 095public class ReplicationSink { 096 097 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 098 private final Configuration conf; 099 // Volatile because of note in here -- look for double-checked locking: 100 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 101 /** 102 * This shared {@link Connection} is used for handling bulk load hfiles replication. 103 */ 104 private volatile Connection sharedConnection; 105 /** 106 * This shared {@link AsyncConnection} is used for handling wal replication. 107 */ 108 private volatile AsyncConnection sharedAsyncConnection; 109 private final MetricsSink metrics; 110 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 111 private final Object sharedConnectionLock = new Object(); 112 private final Object sharedAsyncConnectionLock = new Object(); 113 // Number of hfiles that we successfully replicated 114 private long hfilesReplicated = 0; 115 private SourceFSConfigurationProvider provider; 116 private WALEntrySinkFilter walEntrySinkFilter; 117 118 /** 119 * Row size threshold for multi requests above which a warning is logged 120 */ 121 private final int rowSizeWarnThreshold; 122 private boolean replicationSinkTrackerEnabled; 123 124 private final RegionServerCoprocessorHost rsServerHost; 125 126 /** 127 * Create a sink for replication 128 * @param conf conf object 129 * @throws IOException thrown when HDFS goes bad or bad file name 130 */ 131 public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost) 132 throws IOException { 133 this.conf = HBaseConfiguration.create(conf); 134 this.rsServerHost = rsServerHost; 135 rowSizeWarnThreshold = 136 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 137 replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, 138 REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); 139 decorateConf(); 140 this.metrics = new MetricsSink(); 141 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 142 String className = conf.get("hbase.replication.source.fs.conf.provider", 143 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 144 try { 145 @SuppressWarnings("rawtypes") 146 Class c = Class.forName(className); 147 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 148 } catch (Exception e) { 149 throw new IllegalArgumentException( 150 "Configured source fs configuration provider class " + className + " throws error.", e); 151 } 152 } 153 154 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 155 Class<?> walEntryFilterClass = 156 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 157 WALEntrySinkFilter filter = null; 158 try { 159 filter = walEntryFilterClass == null 160 ? null 161 : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); 162 } catch (Exception e) { 163 LOG.warn("Failed to instantiate " + walEntryFilterClass); 164 } 165 if (filter != null) { 166 filter.init(getConnection()); 167 } 168 return filter; 169 } 170 171 /** 172 * decorate the Configuration object to make replication more receptive to delays: lessen the 173 * timeout and numTries. 174 */ 175 private void decorateConf() { 176 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 177 this.conf.getInt("replication.sink.client.retries.number", 4)); 178 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 179 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 180 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 181 if (StringUtils.isNotEmpty(replicationCodec)) { 182 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 183 } 184 // use server ZK cluster for replication, so we unset the client ZK related properties if any 185 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 186 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 187 } 188 } 189 190 /** 191 * Replicate this array of entries directly into the local cluster using the native client. Only 192 * operates against raw protobuf type saving on a conversion from pb to pojo. 193 * @param entries WAL entries to be replicated. 194 * @param cells cell scanner for iteration. 195 * @param replicationClusterId Id which will uniquely identify source cluster FS client 196 * configurations in the replication configuration directory 197 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 198 * directory 199 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 200 * @throws IOException If failed to replicate the data 201 */ 202 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 203 String replicationClusterId, String sourceBaseNamespaceDirPath, 204 String sourceHFileArchiveDirPath) throws IOException { 205 if (entries.isEmpty()) return; 206 // Very simple optimization where we batch sequences of rows going 207 // to the same table. 208 try { 209 long totalReplicated = 0; 210 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 211 // invocation of this method per table and cluster id. 212 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 213 214 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 215 Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs = 216 new Pair<>(new ArrayList<>(), new ArrayList<>()); 217 for (WALEntry entry : entries) { 218 TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); 219 if (this.walEntrySinkFilter != null) { 220 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 221 // Skip Cells in CellScanner associated with this entry. 222 int count = entry.getAssociatedCellCount(); 223 for (int i = 0; i < count; i++) { 224 // Throw index out of bounds if our cell count is off 225 if (!cells.advance()) { 226 this.metrics.incrementFailedBatches(); 227 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 228 } 229 } 230 continue; 231 } 232 } 233 Cell previousCell = null; 234 Mutation mutation = null; 235 int count = entry.getAssociatedCellCount(); 236 for (int i = 0; i < count; i++) { 237 // Throw index out of bounds if our cell count is off 238 if (!cells.advance()) { 239 this.metrics.incrementFailedBatches(); 240 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 241 } 242 Cell cell = cells.current(); 243 // Handle bulk load hfiles replication 244 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 245 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 246 if (bld.getReplicate()) { 247 if (bulkLoadsPerClusters == null) { 248 bulkLoadsPerClusters = new HashMap<>(); 249 } 250 // Map of table name Vs list of pair of family and list of 251 // hfile paths from its namespace 252 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 253 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); 254 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 255 } 256 } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { 257 Mutation put = processReplicationMarkerEntry(cell); 258 if (put == null) { 259 continue; 260 } 261 table = REPLICATION_SINK_TRACKER_TABLE_NAME; 262 List<UUID> clusterIds = new ArrayList<>(); 263 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 264 clusterIds.add(toUUID(clusterId)); 265 } 266 put.setClusterIds(clusterIds); 267 addToHashMultiMap(rowMap, table, clusterIds, put); 268 } else { 269 // Handle wal replication 270 if (isNewRowOrType(previousCell, cell)) { 271 // Create new mutation 272 mutation = CellUtil.isDelete(cell) 273 ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 274 : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 275 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 276 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 277 clusterIds.add(toUUID(clusterId)); 278 } 279 mutation.setClusterIds(clusterIds); 280 if (rsServerHost != null) { 281 rsServerHost.preReplicationSinkBatchMutate(entry, mutation); 282 mutationsToWalEntriesPairs.getFirst().add(mutation); 283 mutationsToWalEntriesPairs.getSecond().add(entry); 284 } 285 addToHashMultiMap(rowMap, table, clusterIds, mutation); 286 } 287 if (CellUtil.isDelete(cell)) { 288 ((Delete) mutation).add(cell); 289 } else { 290 ((Put) mutation).add(cell); 291 } 292 previousCell = cell; 293 } 294 } 295 totalReplicated++; 296 } 297 298 // TODO Replicating mutations and bulk loaded data can be made parallel 299 if (!rowMap.isEmpty()) { 300 LOG.debug("Started replicating mutations."); 301 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 302 batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); 303 } 304 LOG.debug("Finished replicating mutations."); 305 } 306 307 if (rsServerHost != null) { 308 List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst(); 309 List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond(); 310 for (int i = 0; i < mutations.size(); i++) { 311 rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); 312 } 313 } 314 315 if (bulkLoadsPerClusters != null) { 316 for (Entry<List<String>, 317 Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 318 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 319 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 320 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 321 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 322 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 323 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 324 getConnection(), entry.getKey())) { 325 hFileReplicator.replicate(); 326 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 327 } 328 } 329 } 330 } 331 332 int size = entries.size(); 333 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 334 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 335 this.totalReplicatedEdits.addAndGet(totalReplicated); 336 } catch (IOException ex) { 337 LOG.error("Unable to accept edit because:", ex); 338 this.metrics.incrementFailedBatches(); 339 throw ex; 340 } 341 } 342 343 /* 344 * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. 345 * If false, then ignore this cell. If set to true, de-serialize value into 346 * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and 347 * timestamp from ReplicationMarkerDescriptor. 348 */ 349 private Put processReplicationMarkerEntry(Cell cell) throws IOException { 350 // If source is emitting replication marker rows but sink is not accepting them, 351 // ignore the edits. 352 if (!replicationSinkTrackerEnabled) { 353 return null; 354 } 355 WALProtos.ReplicationMarkerDescriptor descriptor = 356 WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(), 357 cell.getValueOffset(), cell.getValueLength())); 358 Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 359 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(), 360 (Bytes.toBytes(descriptor.getRegionServerName()))); 361 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(), 362 Bytes.toBytes(descriptor.getWalName())); 363 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(), 364 Bytes.toBytes(cell.getTimestamp())); 365 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(), 366 Bytes.toBytes(descriptor.getOffset())); 367 return put; 368 } 369 370 private void buildBulkLoadHFileMap( 371 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 372 BulkLoadDescriptor bld) throws IOException { 373 List<StoreDescriptor> storesList = bld.getStoresList(); 374 int storesSize = storesList.size(); 375 for (int j = 0; j < storesSize; j++) { 376 StoreDescriptor storeDescriptor = storesList.get(j); 377 List<String> storeFileList = storeDescriptor.getStoreFileList(); 378 int storeFilesSize = storeFileList.size(); 379 hfilesReplicated += storeFilesSize; 380 for (int k = 0; k < storeFilesSize; k++) { 381 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 382 383 // Build hfile relative path from its namespace 384 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 385 String tableName = table.getNameWithNamespaceInclAsString(); 386 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 387 if (familyHFilePathsList != null) { 388 boolean foundFamily = false; 389 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 390 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 391 // Found family already present, just add the path to the existing list 392 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 393 foundFamily = true; 394 break; 395 } 396 } 397 if (!foundFamily) { 398 // Family not found, add this family and its hfile paths pair to the list 399 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 400 } 401 } else { 402 // Add this table entry into the map 403 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 404 } 405 } 406 } 407 } 408 409 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 410 List<Pair<byte[], List<String>>> familyHFilePathsList) { 411 List<String> hfilePaths = new ArrayList<>(1); 412 hfilePaths.add(pathToHfileFromNS); 413 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 414 } 415 416 private void addNewTableEntryInMap( 417 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 418 String pathToHfileFromNS, String tableName) { 419 List<String> hfilePaths = new ArrayList<>(1); 420 hfilePaths.add(pathToHfileFromNS); 421 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 422 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 423 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 424 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 425 } 426 427 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 428 byte[] family) { 429 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 430 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 431 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 432 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 433 } 434 435 /** Returns True if we have crossed over onto a new row or type */ 436 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 437 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 438 || !CellUtil.matchingRows(previousCell, cell); 439 } 440 441 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 442 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 443 } 444 445 /** 446 * Simple helper to a map from key to (a list of) values TODO: Make a general utility method * 447 * * @return the list of values corresponding to key1 and key2 448 */ 449 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, 450 V value) { 451 Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); 452 List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); 453 values.add(value); 454 return values; 455 } 456 457 /** 458 * stop the thread pool executor. It is called when the regionserver is stopped. 459 */ 460 public void stopReplicationSinkServices() { 461 try { 462 if (this.sharedConnection != null) { 463 synchronized (sharedConnectionLock) { 464 if (this.sharedConnection != null) { 465 this.sharedConnection.close(); 466 this.sharedConnection = null; 467 } 468 } 469 } 470 } catch (IOException e) { 471 // ignoring as we are closing. 472 LOG.warn("IOException while closing the sharedConnection", e); 473 } 474 475 try { 476 if (this.sharedAsyncConnection != null) { 477 synchronized (sharedAsyncConnectionLock) { 478 if (this.sharedAsyncConnection != null) { 479 this.sharedAsyncConnection.close(); 480 this.sharedAsyncConnection = null; 481 } 482 } 483 } 484 } catch (IOException e) { 485 // ignoring as we are closing. 486 LOG.warn("IOException while closing the sharedAsyncConnection", e); 487 } 488 } 489 490 /** 491 * Do the changes and handle the pool 492 * @param tableName table to insert into 493 * @param allRows list of actions 494 * @param batchRowSizeThreshold rowSize threshold for batch mutation 495 */ 496 private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) 497 throws IOException { 498 if (allRows.isEmpty()) { 499 return; 500 } 501 AsyncTable<?> table = getAsyncConnection().getTable(tableName); 502 List<Future<?>> futures = new ArrayList<>(); 503 for (List<Row> rows : allRows) { 504 List<List<Row>> batchRows; 505 if (rows.size() > batchRowSizeThreshold) { 506 batchRows = Lists.partition(rows, batchRowSizeThreshold); 507 } else { 508 batchRows = Collections.singletonList(rows); 509 } 510 futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); 511 } 512 513 for (Future<?> future : futures) { 514 try { 515 FutureUtils.get(future); 516 } catch (RetriesExhaustedException e) { 517 if (e.getCause() instanceof TableNotFoundException) { 518 throw new TableNotFoundException("'" + tableName + "'"); 519 } 520 throw e; 521 } 522 } 523 } 524 525 /** 526 * Return the shared {@link Connection} which is used for handling bulk load hfiles replication. 527 */ 528 private Connection getConnection() throws IOException { 529 // See https://en.wikipedia.org/wiki/Double-checked_locking 530 Connection connection = sharedConnection; 531 if (connection == null) { 532 synchronized (sharedConnectionLock) { 533 connection = sharedConnection; 534 if (connection == null) { 535 connection = ConnectionFactory.createConnection(conf); 536 sharedConnection = connection; 537 } 538 } 539 } 540 return connection; 541 } 542 543 /** 544 * Return the shared {@link AsyncConnection} which is used for handling wal replication. 545 */ 546 private AsyncConnection getAsyncConnection() throws IOException { 547 // See https://en.wikipedia.org/wiki/Double-checked_locking 548 AsyncConnection asyncConnection = sharedAsyncConnection; 549 if (asyncConnection == null) { 550 synchronized (sharedAsyncConnectionLock) { 551 asyncConnection = sharedAsyncConnection; 552 if (asyncConnection == null) { 553 /** 554 * Get the AsyncConnection immediately. 555 */ 556 asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf)); 557 sharedAsyncConnection = asyncConnection; 558 } 559 } 560 } 561 return asyncConnection; 562 } 563 564 /** 565 * Get a string representation of this sink's metrics 566 * @return string with the total replicated edits count and the date of the last edit that was 567 * applied 568 */ 569 public String getStats() { 570 return this.totalReplicatedEdits.get() == 0 571 ? "" 572 : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() 573 + ", total replicated edits: " + this.totalReplicatedEdits; 574 } 575 576 /** 577 * Get replication Sink Metrics 578 */ 579 public MetricsSink getSinkMetrics() { 580 return this.metrics; 581 } 582}