001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; 021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; 022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; 023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; 024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; 026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; 027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; 028 029import java.io.ByteArrayInputStream; 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.Future; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.ExtendedCell; 049import org.apache.hadoop.hbase.ExtendedCellScanner; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.TableNotFoundException; 054import org.apache.hadoop.hbase.client.AsyncClusterConnection; 055import org.apache.hadoop.hbase.client.AsyncTable; 056import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 057import org.apache.hadoop.hbase.client.Delete; 058import org.apache.hadoop.hbase.client.Mutation; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RetriesExhaustedException; 061import org.apache.hadoop.hbase.client.Row; 062import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 063import org.apache.hadoop.hbase.replication.ReplicationUtils; 064import org.apache.hadoop.hbase.security.UserProvider; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.FutureUtils; 067import org.apache.hadoop.hbase.util.Pair; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 074 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 080 081/** 082 * <p> 083 * This class is responsible for replicating the edits coming from another cluster. 084 * </p> 085 * <p> 086 * This replication process is currently waiting for the edits to be applied before the method can 087 * return. This means that the replication of edits is synchronized (after reading from WALs in 088 * ReplicationSource) and that a single region server cannot receive edits from two sources at the 089 * same time 090 * </p> 091 * <p> 092 * This class uses the native HBase client in order to replicate entries. 093 * </p> 094 * TODO make this class more like ReplicationSource wrt log handling 095 */ 096@InterfaceAudience.Private 097public class ReplicationSink { 098 099 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 100 private final Configuration conf; 101 // Volatile because of note in here -- look for double-checked locking: 102 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 103 private volatile AsyncClusterConnection sharedConn; 104 private final MetricsSink metrics; 105 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 106 private final Object sharedConnLock = new Object(); 107 // Number of hfiles that we successfully replicated 108 private long hfilesReplicated = 0; 109 private SourceFSConfigurationProvider provider; 110 private WALEntrySinkFilter walEntrySinkFilter; 111 112 /** 113 * Row size threshold for multi requests above which a warning is logged 114 */ 115 private final int rowSizeWarnThreshold; 116 private boolean replicationSinkTrackerEnabled; 117 118 private final RegionServerCoprocessorHost rsServerHost; 119 120 /** 121 * Create a sink for replication 122 * @param conf conf object 123 * @throws IOException thrown when HDFS goes bad or bad file name 124 */ 125 public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost) 126 throws IOException { 127 this.conf = HBaseConfiguration.create(conf); 128 this.rsServerHost = rsServerHost; 129 rowSizeWarnThreshold = 130 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 131 replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, 132 REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); 133 decorateConf(); 134 this.metrics = new MetricsSink(); 135 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 136 String className = conf.get("hbase.replication.source.fs.conf.provider", 137 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 138 try { 139 Class<? extends SourceFSConfigurationProvider> c = 140 Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); 141 this.provider = c.getDeclaredConstructor().newInstance(); 142 } catch (Exception e) { 143 throw new IllegalArgumentException( 144 "Configured source fs configuration provider class " + className + " throws error.", e); 145 } 146 } 147 148 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 149 Class<?> walEntryFilterClass = 150 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 151 WALEntrySinkFilter filter = null; 152 try { 153 filter = walEntryFilterClass == null 154 ? null 155 : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); 156 } catch (Exception e) { 157 LOG.warn("Failed to instantiate " + walEntryFilterClass); 158 } 159 if (filter != null) { 160 filter.init(getConnection()); 161 } 162 return filter; 163 } 164 165 /** 166 * decorate the Configuration object to make replication more receptive to delays: lessen the 167 * timeout and numTries. 168 */ 169 private void decorateConf() { 170 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 171 this.conf.getInt("replication.sink.client.retries.number", 4)); 172 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 173 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 174 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 175 if (StringUtils.isNotEmpty(replicationCodec)) { 176 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 177 } 178 // use server ZK cluster for replication, so we unset the client ZK related properties if any 179 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 180 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 181 } 182 } 183 184 /** 185 * Replicate this array of entries directly into the local cluster using the native client. Only 186 * operates against raw protobuf type saving on a conversion from pb to pojo. 187 * @param entries WAL entries to be replicated. 188 * @param cells cell scanner for iteration. 189 * @param replicationClusterId Id which will uniquely identify source cluster FS client 190 * configurations in the replication configuration directory 191 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 192 * directory 193 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 194 * @throws IOException If failed to replicate the data 195 */ 196 public void replicateEntries(List<WALEntry> entries, final ExtendedCellScanner cells, 197 String replicationClusterId, String sourceBaseNamespaceDirPath, 198 String sourceHFileArchiveDirPath) throws IOException { 199 if (entries.isEmpty()) { 200 return; 201 } 202 // Very simple optimization where we batch sequences of rows going 203 // to the same table. 204 try { 205 long totalReplicated = 0; 206 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 207 // invocation of this method per table and cluster id. 208 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 209 210 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 211 Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs = 212 new Pair<>(new ArrayList<>(), new ArrayList<>()); 213 for (WALEntry entry : entries) { 214 TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); 215 if (this.walEntrySinkFilter != null) { 216 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 217 // Skip Cells in CellScanner associated with this entry. 218 int count = entry.getAssociatedCellCount(); 219 for (int i = 0; i < count; i++) { 220 // Throw index out of bounds if our cell count is off 221 if (!cells.advance()) { 222 this.metrics.incrementFailedBatches(); 223 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 224 } 225 } 226 continue; 227 } 228 } 229 ExtendedCell previousCell = null; 230 Mutation mutation = null; 231 int count = entry.getAssociatedCellCount(); 232 for (int i = 0; i < count; i++) { 233 // Throw index out of bounds if our cell count is off 234 if (!cells.advance()) { 235 this.metrics.incrementFailedBatches(); 236 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 237 } 238 ExtendedCell cell = cells.current(); 239 // Handle bulk load hfiles replication 240 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 241 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 242 if (bld.getReplicate()) { 243 if (bulkLoadsPerClusters == null) { 244 bulkLoadsPerClusters = new HashMap<>(); 245 } 246 // Map of table name Vs list of pair of family and list of 247 // hfile paths from its namespace 248 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 249 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); 250 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 251 } 252 } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { 253 Mutation put = processReplicationMarkerEntry(cell); 254 if (put == null) { 255 continue; 256 } 257 table = REPLICATION_SINK_TRACKER_TABLE_NAME; 258 List<UUID> clusterIds = new ArrayList<>(); 259 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 260 clusterIds.add(toUUID(clusterId)); 261 } 262 put.setClusterIds(clusterIds); 263 addToHashMultiMap(rowMap, table, clusterIds, put); 264 } else { 265 // Handle wal replication 266 if (isNewRowOrType(previousCell, cell)) { 267 // Create new mutation 268 mutation = CellUtil.isDelete(cell) 269 ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 270 : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 271 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 272 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 273 clusterIds.add(toUUID(clusterId)); 274 } 275 mutation.setClusterIds(clusterIds); 276 mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, 277 HConstants.EMPTY_BYTE_ARRAY); 278 if (rsServerHost != null) { 279 rsServerHost.preReplicationSinkBatchMutate(entry, mutation); 280 mutationsToWalEntriesPairs.getFirst().add(mutation); 281 mutationsToWalEntriesPairs.getSecond().add(entry); 282 } 283 addToHashMultiMap(rowMap, table, clusterIds, mutation); 284 } 285 if (CellUtil.isDelete(cell)) { 286 ((Delete) mutation).add(cell); 287 } else { 288 ((Put) mutation).add(cell); 289 } 290 previousCell = cell; 291 } 292 } 293 totalReplicated++; 294 } 295 296 // TODO Replicating mutations and bulk loaded data can be made parallel 297 if (!rowMap.isEmpty()) { 298 LOG.debug("Started replicating mutations."); 299 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 300 batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); 301 } 302 LOG.debug("Finished replicating mutations."); 303 } 304 305 if (rsServerHost != null) { 306 List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst(); 307 List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond(); 308 for (int i = 0; i < mutations.size(); i++) { 309 rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); 310 } 311 } 312 313 if (bulkLoadsPerClusters != null) { 314 for (Entry<List<String>, 315 Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 316 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 317 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 318 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 319 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 320 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 321 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 322 getConnection(), entry.getKey())) { 323 hFileReplicator.replicate(); 324 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 325 } 326 } 327 } 328 } 329 330 int size = entries.size(); 331 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 332 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 333 this.totalReplicatedEdits.addAndGet(totalReplicated); 334 } catch (IOException ex) { 335 LOG.error("Unable to accept edit because:", ex); 336 this.metrics.incrementFailedBatches(); 337 throw ex; 338 } 339 } 340 341 /* 342 * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. 343 * If false, then ignore this cell. If set to true, de-serialize value into 344 * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and 345 * timestamp from ReplicationMarkerDescriptor. 346 */ 347 private Put processReplicationMarkerEntry(Cell cell) throws IOException { 348 // If source is emitting replication marker rows but sink is not accepting them, 349 // ignore the edits. 350 if (!replicationSinkTrackerEnabled) { 351 return null; 352 } 353 WALProtos.ReplicationMarkerDescriptor descriptor = 354 WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(), 355 cell.getValueOffset(), cell.getValueLength())); 356 Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 357 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(), 358 (Bytes.toBytes(descriptor.getRegionServerName()))); 359 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(), 360 Bytes.toBytes(descriptor.getWalName())); 361 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(), 362 Bytes.toBytes(cell.getTimestamp())); 363 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(), 364 Bytes.toBytes(descriptor.getOffset())); 365 return put; 366 } 367 368 private void buildBulkLoadHFileMap( 369 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 370 BulkLoadDescriptor bld) throws IOException { 371 List<StoreDescriptor> storesList = bld.getStoresList(); 372 int storesSize = storesList.size(); 373 for (int j = 0; j < storesSize; j++) { 374 StoreDescriptor storeDescriptor = storesList.get(j); 375 List<String> storeFileList = storeDescriptor.getStoreFileList(); 376 int storeFilesSize = storeFileList.size(); 377 hfilesReplicated += storeFilesSize; 378 for (int k = 0; k < storeFilesSize; k++) { 379 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 380 381 // Build hfile relative path from its namespace 382 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 383 String tableName = table.getNameWithNamespaceInclAsString(); 384 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 385 if (familyHFilePathsList != null) { 386 boolean foundFamily = false; 387 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 388 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 389 // Found family already present, just add the path to the existing list 390 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 391 foundFamily = true; 392 break; 393 } 394 } 395 if (!foundFamily) { 396 // Family not found, add this family and its hfile paths pair to the list 397 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 398 } 399 } else { 400 // Add this table entry into the map 401 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 402 } 403 } 404 } 405 } 406 407 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 408 List<Pair<byte[], List<String>>> familyHFilePathsList) { 409 List<String> hfilePaths = new ArrayList<>(1); 410 hfilePaths.add(pathToHfileFromNS); 411 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 412 } 413 414 private void addNewTableEntryInMap( 415 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 416 String pathToHfileFromNS, String tableName) { 417 List<String> hfilePaths = new ArrayList<>(1); 418 hfilePaths.add(pathToHfileFromNS); 419 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 420 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 421 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 422 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 423 } 424 425 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 426 byte[] family) { 427 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 428 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 429 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 430 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 431 } 432 433 /** Returns True if we have crossed over onto a new row or type */ 434 private boolean isNewRowOrType(final ExtendedCell previousCell, final ExtendedCell cell) { 435 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 436 || !CellUtil.matchingRows(previousCell, cell); 437 } 438 439 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 440 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 441 } 442 443 /** 444 * Simple helper to a map from key to (a list of) values TODO: Make a general utility method 445 * @return the list of values corresponding to key1 and key2 446 */ 447 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, 448 V value) { 449 Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); 450 List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); 451 values.add(value); 452 return values; 453 } 454 455 /** 456 * stop the thread pool executor. It is called when the regionserver is stopped. 457 */ 458 public void stopReplicationSinkServices() { 459 try { 460 if (this.sharedConn != null) { 461 synchronized (sharedConnLock) { 462 if (this.sharedConn != null) { 463 this.sharedConn.close(); 464 this.sharedConn = null; 465 } 466 } 467 } 468 } catch (IOException e) { 469 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 470 } 471 } 472 473 /** 474 * Do the changes and handle the pool 475 * @param tableName table to insert into 476 * @param allRows list of actions 477 * @param batchRowSizeThreshold rowSize threshold for batch mutation 478 */ 479 private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) 480 throws IOException { 481 if (allRows.isEmpty()) { 482 return; 483 } 484 AsyncTable<?> table = getConnection().getTable(tableName); 485 List<Future<?>> futures = new ArrayList<>(); 486 for (List<Row> rows : allRows) { 487 List<List<Row>> batchRows; 488 if (rows.size() > batchRowSizeThreshold) { 489 batchRows = Lists.partition(rows, batchRowSizeThreshold); 490 } else { 491 batchRows = Collections.singletonList(rows); 492 } 493 futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); 494 } 495 // Here we will always wait until all futures are finished, even if there are failures when 496 // getting from a future in the middle. This is because this method may be called in a rpc call, 497 // so the batch operations may reference some off heap cells(through CellScanner). If we return 498 // earlier here, the rpc call may be finished and they will release the off heap cells before 499 // some of the batch operations finish, and then cause corrupt data or even crash the region 500 // server. See HBASE-28584 and HBASE-28850 for more details. 501 IOException error = null; 502 for (Future<?> future : futures) { 503 try { 504 FutureUtils.get(future); 505 } catch (RetriesExhaustedException e) { 506 IOException ioe; 507 if (e.getCause() instanceof TableNotFoundException) { 508 ioe = new TableNotFoundException("'" + tableName + "'"); 509 } else { 510 ioe = e; 511 } 512 if (error == null) { 513 error = ioe; 514 } else { 515 error.addSuppressed(ioe); 516 } 517 } 518 } 519 if (error != null) { 520 throw error; 521 } 522 } 523 524 private AsyncClusterConnection getConnection() throws IOException { 525 // See https://en.wikipedia.org/wiki/Double-checked_locking 526 AsyncClusterConnection connection = sharedConn; 527 if (connection == null) { 528 synchronized (sharedConnLock) { 529 connection = sharedConn; 530 if (connection == null) { 531 connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, 532 UserProvider.instantiate(conf).getCurrent()); 533 sharedConn = connection; 534 } 535 } 536 } 537 return connection; 538 } 539 540 /** 541 * Get a string representation of this sink's metrics 542 * @return string with the total replicated edits count and the date of the last edit that was 543 * applied 544 */ 545 public String getStats() { 546 long total = this.totalReplicatedEdits.get(); 547 return total == 0 548 ? "" 549 : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() 550 + ", total replicated edits: " + total; 551 } 552 553 /** 554 * Get replication Sink Metrics 555 */ 556 public MetricsSink getSinkMetrics() { 557 return this.metrics; 558 } 559}