001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Future; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.CellScanner; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.TableDescriptors; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.ClusterConnection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionReplicaUtil; 050import org.apache.hadoop.hbase.client.RetryingCallable; 051import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.ipc.HBaseRpcController; 054import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 055import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 056import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 057import org.apache.hadoop.hbase.replication.WALEntryFilter; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.hbase.wal.EntryBuffers; 062import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 063import org.apache.hadoop.hbase.wal.OutputSink; 064import org.apache.hadoop.hbase.wal.WAL.Entry; 065import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 066import org.apache.hadoop.util.StringUtils; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 072import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 073import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 074 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 077 078/** 079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL 080 * edits from the WAL, and sends the edits to replicas of regions. 081 */ 082@InterfaceAudience.Private 083public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { 084 085 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class); 086 087 // Can be configured differently than hbase.client.retries.number 088 private static String CLIENT_RETRIES_NUMBER = 089 "hbase.region.replica.replication.client.retries.number"; 090 091 private Configuration conf; 092 private ClusterConnection connection; 093 private TableDescriptors tableDescriptors; 094 095 // Reuse WALSplitter constructs as a WAL pipe 096 private PipelineController controller; 097 private RegionReplicaOutputSink outputSink; 098 private EntryBuffers entryBuffers; 099 100 // Number of writer threads 101 private int numWriterThreads; 102 103 private int operationTimeout; 104 105 private ExecutorService pool; 106 107 @Override 108 public void init(Context context) throws IOException { 109 super.init(context); 110 111 this.conf = HBaseConfiguration.create(context.getConfiguration()); 112 this.tableDescriptors = context.getTableDescriptors(); 113 114 // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. 115 // We are resetting it here because we want default number of retries (35) rather than 10 times 116 // that which makes very long retries for disabled tables etc. 117 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 118 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 119 if (defaultNumRetries > 10) { 120 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 121 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 122 defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already 123 } 124 125 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 126 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); 127 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); 128 129 this.numWriterThreads = this.conf.getInt("hbase.region.replica.replication.writer.threads", 3); 130 controller = new PipelineController(); 131 entryBuffers = new EntryBuffers(controller, 132 this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024)); 133 134 // use the regular RPC timeout for replica replication RPC's 135 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 136 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 137 } 138 139 @Override 140 protected void doStart() { 141 try { 142 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 143 this.pool = getDefaultThreadPool(conf); 144 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers, 145 connection, pool, numWriterThreads, operationTimeout); 146 outputSink.startWriterThreads(); 147 super.doStart(); 148 } catch (IOException ex) { 149 LOG.warn("Received exception while creating connection :" + ex); 150 notifyFailed(ex); 151 } 152 } 153 154 @Override 155 protected void doStop() { 156 if (outputSink != null) { 157 try { 158 outputSink.close(); 159 } catch (IOException ex) { 160 LOG.warn("Got exception while trying to close OutputSink", ex); 161 } 162 } 163 if (this.pool != null) { 164 this.pool.shutdownNow(); 165 try { 166 // wait for 10 sec 167 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); 168 if (!shutdown) { 169 LOG.warn("Failed to shutdown the thread pool after 10 seconds"); 170 } 171 } catch (InterruptedException e) { 172 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); 173 } 174 } 175 if (connection != null) { 176 try { 177 connection.close(); 178 } catch (IOException ex) { 179 LOG.warn("Got exception closing connection :" + ex); 180 } 181 } 182 super.doStop(); 183 } 184 185 /** 186 * Returns a Thread pool for the RPC's to region replicas. Similar to Connection's thread pool. 187 */ 188 private ExecutorService getDefaultThreadPool(Configuration conf) { 189 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); 190 if (maxThreads == 0) { 191 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 192 } 193 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); 194 LinkedBlockingQueue<Runnable> workQueue = 195 new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 196 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 197 ThreadPoolExecutor tpe = 198 new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 199 new ThreadFactoryBuilder() 200 .setNameFormat(this.getClass().getSimpleName() + "-rpc-shared-pool-%d") 201 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 202 tpe.allowCoreThreadTimeOut(true); 203 return tpe; 204 } 205 206 @Override 207 public boolean replicate(ReplicateContext replicateContext) { 208 /* 209 * A note on batching in RegionReplicaReplicationEndpoint (RRRE): RRRE relies on batching from 210 * two different mechanisms. The first is the batching from ReplicationSource since RRRE is a 211 * ReplicationEndpoint driven by RS. RS reads from a single WAL file filling up a buffer of heap 212 * size "replication.source.size.capacity"(64MB) or at most "replication.source.nb.capacity" 213 * entries or until it sees the end of file (in live tailing). Then RS passes all the buffered 214 * edits in this replicate() call context. RRRE puts the edits to the WALSplitter.EntryBuffers 215 * which is a blocking buffer space of up to "hbase.region.replica.replication.buffersize" 216 * (128MB) in size. This buffer splits the edits based on regions. There are 217 * "hbase.region.replica.replication.writer.threads"(default 3) writer threads which pick 218 * largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). The 219 * SinkWriter in this case will send the wal edits to all secondary region replicas in parallel 220 * via a retrying rpc call. EntryBuffers guarantees that while a buffer is being written to the 221 * sink, another buffer for the same region will not be made available to writers ensuring 222 * regions edits are not replayed out of order. The replicate() call won't return until all the 223 * buffers are sent and ack'd by the sinks so that the replication can assume all edits are 224 * persisted. We may be able to do a better pipelining between the replication thread and output 225 * sinks later if it becomes a bottleneck. 226 */ 227 228 while (this.isRunning()) { 229 try { 230 for (Entry entry : replicateContext.getEntries()) { 231 entryBuffers.appendEntry(entry); 232 } 233 outputSink.flush(); // make sure everything is flushed 234 ctx.getMetrics().incrLogEditsFiltered(outputSink.getSkippedEditsCounter().getAndSet(0)); 235 return true; 236 } catch (InterruptedException e) { 237 Thread.currentThread().interrupt(); 238 return false; 239 } catch (IOException e) { 240 LOG.warn( 241 "Received IOException while trying to replicate" + StringUtils.stringifyException(e)); 242 outputSink.restartWriterThreadsIfNeeded(); 243 } 244 } 245 246 return false; 247 } 248 249 @Override 250 public boolean canReplicateToSameCluster() { 251 return true; 252 } 253 254 @Override 255 protected WALEntryFilter getScopeWALEntryFilter() { 256 // we do not care about scope. We replicate everything. 257 return null; 258 } 259 260 static class RegionReplicaOutputSink extends OutputSink { 261 private final RegionReplicaSinkWriter sinkWriter; 262 private final TableDescriptors tableDescriptors; 263 private final Cache<TableName, Boolean> memstoreReplicationEnabled; 264 265 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, 266 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, 267 int operationTimeout) { 268 super(controller, entryBuffers, numWriters); 269 this.sinkWriter = 270 new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); 271 this.tableDescriptors = tableDescriptors; 272 273 // A cache for the table "memstore replication enabled" flag. 274 // It has a default expiry of 5 sec. This means that if the table is altered 275 // with a different flag value, we might miss to replicate for that amount of 276 // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. 277 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() 278 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); 279 this.memstoreReplicationEnabled = CacheBuilder.newBuilder() 280 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) 281 .initialCapacity(10).maximumSize(1000).build(); 282 } 283 284 @Override 285 public void append(RegionEntryBuffer buffer) throws IOException { 286 List<Entry> entries = buffer.getEntries(); 287 288 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { 289 return; 290 } 291 292 // meta edits (e.g. flush) are always replicated. 293 // data edits (e.g. put) are replicated if the table requires them. 294 if (!requiresReplication(buffer.getTableName(), entries)) { 295 return; 296 } 297 298 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), 299 CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); 300 } 301 302 void flush() throws IOException { 303 // nothing much to do for now. Wait for the Writer threads to finish up 304 // append()'ing the data. 305 entryBuffers.waitUntilDrained(); 306 } 307 308 @Override 309 public boolean keepRegionEvent(Entry entry) { 310 return true; 311 } 312 313 @Override 314 public List<Path> close() throws IOException { 315 finishWriterThreads(true); 316 return null; 317 } 318 319 @Override 320 public Map<String, Long> getOutputCounts() { 321 return null; // only used in tests 322 } 323 324 @Override 325 public int getNumberOfRecoveredRegions() { 326 return 0; 327 } 328 329 AtomicLong getSkippedEditsCounter() { 330 return totalSkippedEdits; 331 } 332 333 /** 334 * returns true if the specified entry must be replicated. We should always replicate meta 335 * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the 336 * memstore. 337 */ 338 private boolean requiresReplication(final TableName tableName, final List<Entry> entries) 339 throws IOException { 340 // unit-tests may not the TableDescriptors, bypass the check and always replicate 341 if (tableDescriptors == null) return true; 342 343 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName); 344 if (requiresReplication == null) { 345 // check if the table requires memstore replication 346 // some unit-test drop the table, so we should do a bypass check and always replicate. 347 TableDescriptor htd = tableDescriptors.get(tableName); 348 requiresReplication = htd == null || htd.hasRegionMemStoreReplication(); 349 memstoreReplicationEnabled.put(tableName, requiresReplication); 350 } 351 352 // if memstore replication is not required, check the entries. 353 // meta edits (e.g. flush) must be always replicated. 354 if (!requiresReplication) { 355 int skipEdits = 0; 356 java.util.Iterator<Entry> it = entries.iterator(); 357 while (it.hasNext()) { 358 Entry entry = it.next(); 359 if (entry.getEdit().isMetaEdit()) { 360 requiresReplication = true; 361 } else { 362 it.remove(); 363 skipEdits++; 364 } 365 } 366 totalSkippedEdits.addAndGet(skipEdits); 367 } 368 return requiresReplication; 369 } 370 371 @Override 372 protected int getNumOpenWriters() { 373 // TODO Auto-generated method stub 374 return 0; 375 } 376 } 377 378 static class RegionReplicaSinkWriter { 379 RegionReplicaOutputSink sink; 380 ClusterConnection connection; 381 RpcControllerFactory rpcControllerFactory; 382 RpcRetryingCallerFactory rpcRetryingCallerFactory; 383 int operationTimeout; 384 ExecutorService pool; 385 Cache<TableName, Boolean> disabledAndDroppedTables; 386 TableDescriptors tableDescriptors; 387 388 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, 389 ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { 390 this.sink = sink; 391 this.connection = connection; 392 this.operationTimeout = operationTimeout; 393 this.rpcRetryingCallerFactory = 394 RpcRetryingCallerFactory.instantiate(connection.getConfiguration(), 395 connection.getConnectionConfiguration(), connection.getConnectionMetrics()); 396 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); 397 this.pool = pool; 398 this.tableDescriptors = tableDescriptors; 399 400 int nonExistentTableCacheExpiryMs = connection.getConfiguration() 401 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); 402 // A cache for non existing tables that have a default expiry of 5 sec. This means that if the 403 // table is created again with the same name, we might miss to replicate for that amount of 404 // time. But this cache prevents overloading meta requests for every edit from a deleted file. 405 disabledAndDroppedTables = CacheBuilder.newBuilder() 406 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10) 407 .maximumSize(1000).build(); 408 } 409 410 public void append(TableName tableName, byte[] encodedRegionName, byte[] row, 411 List<Entry> entries) throws IOException { 412 413 if (disabledAndDroppedTables.getIfPresent(tableName) != null) { 414 if (LOG.isTraceEnabled()) { 415 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 416 + " is cached as a disabled or dropped table"); 417 for (Entry entry : entries) { 418 LOG.trace("Skipping : " + entry); 419 } 420 } 421 sink.getSkippedEditsCounter().addAndGet(entries.size()); 422 return; 423 } 424 425 // If the table is disabled or dropped, we should not replay the entries, and we can skip 426 // replaying them. However, we might not know whether the table is disabled until we 427 // invalidate the cache and check from meta 428 RegionLocations locations = null; 429 boolean useCache = true; 430 while (true) { 431 // get the replicas of the primary region 432 try { 433 locations = 434 RegionReplicaReplayCallable.getRegionLocations(connection, tableName, row, useCache, 0); 435 if (locations == null) { 436 throw new HBaseIOException( 437 "Cannot locate locations for " + tableName + ", row:" + Bytes.toStringBinary(row)); 438 } 439 // Replicas can take a while to come online. The cache may have only the primary. If we 440 // keep going to the cache, we will not learn of the replicas and their locations after 441 // they come online. 442 if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) { 443 if (tableDescriptors.get(tableName).getRegionReplication() > 1) { 444 // Make an obnoxious log here. See how bad this issue is. Add a timer if happening 445 // too much. 446 LOG.info("Skipping location cache; only one location found for {}", tableName); 447 useCache = false; 448 continue; 449 } 450 } 451 } catch (TableNotFoundException e) { 452 if (LOG.isTraceEnabled()) { 453 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 454 + " is dropped. Adding table to cache."); 455 for (Entry entry : entries) { 456 LOG.trace("Skipping : " + entry); 457 } 458 } 459 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored 460 // skip this entry 461 sink.getSkippedEditsCounter().addAndGet(entries.size()); 462 return; 463 } 464 465 // check whether we should still replay this entry. If the regions are changed, or the 466 // entry is not coming from the primary region, filter it out. 467 HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); 468 if ( 469 !Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName) 470 ) { 471 if (useCache) { 472 useCache = false; 473 continue; // this will retry location lookup 474 } 475 if (LOG.isTraceEnabled()) { 476 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 477 + " because located region " + primaryLocation.getRegionInfo().getEncodedName() 478 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) 479 + " from WALEdit"); 480 for (Entry entry : entries) { 481 LOG.trace("Skipping : " + entry); 482 } 483 } 484 sink.getSkippedEditsCounter().addAndGet(entries.size()); 485 return; 486 } 487 break; 488 } 489 490 if (locations.size() == 1) { 491 return; 492 } 493 494 ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1); 495 496 // All passed entries should belong to one region because it is coming from the EntryBuffers 497 // split per region. But the regions might split and merge (unlike log recovery case). 498 for (int replicaId = 0; replicaId < locations.size(); replicaId++) { 499 HRegionLocation location = locations.getRegionLocation(replicaId); 500 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { 501 RegionInfo regionInfo = location == null 502 ? RegionReplicaUtil.getRegionInfoForReplica( 503 locations.getDefaultRegionLocation().getRegionInfo(), replicaId) 504 : location.getRegionInfo(); 505 RegionReplicaReplayCallable callable = 506 new RegionReplicaReplayCallable(connection, rpcControllerFactory, tableName, location, 507 regionInfo, row, entries, sink.getSkippedEditsCounter()); 508 Future<ReplicateWALEntryResponse> task = pool.submit( 509 new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout)); 510 tasks.add(task); 511 } 512 } 513 514 boolean tasksCancelled = false; 515 for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { 516 try { 517 tasks.get(replicaId).get(); 518 } catch (InterruptedException e) { 519 throw new InterruptedIOException(e.getMessage()); 520 } catch (ExecutionException e) { 521 Throwable cause = e.getCause(); 522 boolean canBeSkipped = false; 523 if (cause instanceof IOException) { 524 // The table can be disabled or dropped at this time. For disabled tables, we have no 525 // cheap mechanism to detect this case because meta does not contain this information. 526 // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay 527 // RPC. So instead we start the replay RPC with retries and check whether the table is 528 // dropped or disabled which might cause SocketTimeoutException, or 529 // RetriesExhaustedException or similar if we get IOE. 530 if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { 531 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. 532 canBeSkipped = true; 533 } else if (tableDescriptors != null) { 534 TableDescriptor tableDescriptor = tableDescriptors.get(tableName); 535 if ( 536 tableDescriptor != null 537 // (replicaId + 1) as no task is added for primary replica for replication 538 && tableDescriptor.getRegionReplication() <= (replicaId + 1) 539 ) { 540 canBeSkipped = true; 541 } 542 } 543 if (canBeSkipped) { 544 if (LOG.isTraceEnabled()) { 545 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 546 + " because received exception for dropped or disabled table", cause); 547 for (Entry entry : entries) { 548 LOG.trace("Skipping : " + entry); 549 } 550 } 551 if (!tasksCancelled) { 552 sink.getSkippedEditsCounter().addAndGet(entries.size()); 553 tasksCancelled = true; // so that we do not add to skipped counter again 554 } 555 continue; 556 } 557 558 // otherwise rethrow 559 throw (IOException) cause; 560 } 561 // unexpected exception 562 throw new IOException(cause); 563 } 564 } 565 } 566 } 567 568 static class RetryingRpcCallable<V> implements Callable<V> { 569 RpcRetryingCallerFactory factory; 570 RetryingCallable<V> callable; 571 int timeout; 572 573 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, 574 int timeout) { 575 this.factory = factory; 576 this.callable = callable; 577 this.timeout = timeout; 578 } 579 580 @Override 581 public V call() throws Exception { 582 return factory.<V> newCaller().callWithRetries(callable, timeout); 583 } 584 } 585 586 /** 587 * Calls replay on the passed edits for the given set of entries belonging to the region. It skips 588 * the entry if the region boundaries have changed or the region is gone. 589 */ 590 static class RegionReplicaReplayCallable 591 extends RegionAdminServiceCallable<ReplicateWALEntryResponse> { 592 private final List<Entry> entries; 593 private final byte[] initialEncodedRegionName; 594 private final AtomicLong skippedEntries; 595 596 public RegionReplicaReplayCallable(ClusterConnection connection, 597 RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation location, 598 RegionInfo regionInfo, byte[] row, List<Entry> entries, AtomicLong skippedEntries) { 599 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId()); 600 this.entries = entries; 601 this.skippedEntries = skippedEntries; 602 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); 603 } 604 605 @Override 606 public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { 607 // Check whether we should still replay this entry. If the regions are changed, or the 608 // entry is not coming form the primary region, filter it out because we do not need it. 609 // Regions can change because of (1) region split (2) region merge (3) table recreated 610 boolean skip = false; 611 if ( 612 !Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName) 613 ) { 614 skip = true; 615 } 616 if (!this.entries.isEmpty() && !skip) { 617 Entry[] entriesArray = new Entry[this.entries.size()]; 618 entriesArray = this.entries.toArray(entriesArray); 619 620 // set the region name for the target region replica 621 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = 622 ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, 623 location.getRegionInfo().getEncodedNameAsBytes(), null, null, null); 624 controller.setCellScanner(p.getSecond()); 625 return stub.replay(controller, p.getFirst()); 626 } 627 628 if (skip) { 629 if (LOG.isTraceEnabled()) { 630 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 631 + " because located region " + location.getRegionInfo().getEncodedName() 632 + " is different than the original region " 633 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); 634 for (Entry entry : entries) { 635 LOG.trace("Skipping : " + entry); 636 } 637 } 638 skippedEntries.addAndGet(entries.size()); 639 } 640 return ReplicateWALEntryResponse.newBuilder().build(); 641 } 642 } 643}