001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Future;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.TableDescriptors;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.ClusterConnection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.client.RetryingCallable;
051import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.ipc.HBaseRpcController;
054import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
055import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
056import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
057import org.apache.hadoop.hbase.replication.WALEntryFilter;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.hbase.wal.EntryBuffers;
062import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
063import org.apache.hadoop.hbase.wal.OutputSink;
064import org.apache.hadoop.hbase.wal.WAL.Entry;
065import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
066import org.apache.hadoop.util.StringUtils;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
072import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
073import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
074
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
077
078/**
079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
080 * edits from the WAL, and sends the edits to replicas of regions.
081 */
082@InterfaceAudience.Private
083public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
084
085  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
086
087  // Can be configured differently than hbase.client.retries.number
088  private static String CLIENT_RETRIES_NUMBER =
089    "hbase.region.replica.replication.client.retries.number";
090
091  private Configuration conf;
092  private ClusterConnection connection;
093  private TableDescriptors tableDescriptors;
094
095  // Reuse WALSplitter constructs as a WAL pipe
096  private PipelineController controller;
097  private RegionReplicaOutputSink outputSink;
098  private EntryBuffers entryBuffers;
099
100  // Number of writer threads
101  private int numWriterThreads;
102
103  private int operationTimeout;
104
105  private ExecutorService pool;
106
107  @Override
108  public void init(Context context) throws IOException {
109    super.init(context);
110
111    this.conf = HBaseConfiguration.create(context.getConfiguration());
112    this.tableDescriptors = context.getTableDescriptors();
113
114    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
115    // We are resetting it here because we want default number of retries (35) rather than 10 times
116    // that which makes very long retries for disabled tables etc.
117    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
118      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
119    if (defaultNumRetries > 10) {
120      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
121        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
122      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
123    }
124
125    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
126    int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
127    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
128
129    this.numWriterThreads = this.conf.getInt("hbase.region.replica.replication.writer.threads", 3);
130    controller = new PipelineController();
131    entryBuffers = new EntryBuffers(controller,
132      this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
133
134    // use the regular RPC timeout for replica replication RPC's
135    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
136      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
137  }
138
139  @Override
140  protected void doStart() {
141    try {
142      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
143      this.pool = getDefaultThreadPool(conf);
144      outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
145        connection, pool, numWriterThreads, operationTimeout);
146      outputSink.startWriterThreads();
147      super.doStart();
148    } catch (IOException ex) {
149      LOG.warn("Received exception while creating connection :" + ex);
150      notifyFailed(ex);
151    }
152  }
153
154  @Override
155  protected void doStop() {
156    if (outputSink != null) {
157      try {
158        outputSink.close();
159      } catch (IOException ex) {
160        LOG.warn("Got exception while trying to close OutputSink", ex);
161      }
162    }
163    if (this.pool != null) {
164      this.pool.shutdownNow();
165      try {
166        // wait for 10 sec
167        boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
168        if (!shutdown) {
169          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
170        }
171      } catch (InterruptedException e) {
172        LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
173      }
174    }
175    if (connection != null) {
176      try {
177        connection.close();
178      } catch (IOException ex) {
179        LOG.warn("Got exception closing connection :" + ex);
180      }
181    }
182    super.doStop();
183  }
184
185  /**
186   * Returns a Thread pool for the RPC's to region replicas. Similar to Connection's thread pool.
187   */
188  private ExecutorService getDefaultThreadPool(Configuration conf) {
189    int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
190    if (maxThreads == 0) {
191      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
192    }
193    long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
194    LinkedBlockingQueue<Runnable> workQueue =
195      new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
196        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
197    ThreadPoolExecutor tpe =
198      new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
199        new ThreadFactoryBuilder()
200          .setNameFormat(this.getClass().getSimpleName() + "-rpc-shared-pool-%d")
201          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
202    tpe.allowCoreThreadTimeOut(true);
203    return tpe;
204  }
205
206  @Override
207  public boolean replicate(ReplicateContext replicateContext) {
208    /*
209     * A note on batching in RegionReplicaReplicationEndpoint (RRRE): RRRE relies on batching from
210     * two different mechanisms. The first is the batching from ReplicationSource since RRRE is a
211     * ReplicationEndpoint driven by RS. RS reads from a single WAL file filling up a buffer of heap
212     * size "replication.source.size.capacity"(64MB) or at most "replication.source.nb.capacity"
213     * entries or until it sees the end of file (in live tailing). Then RS passes all the buffered
214     * edits in this replicate() call context. RRRE puts the edits to the WALSplitter.EntryBuffers
215     * which is a blocking buffer space of up to "hbase.region.replica.replication.buffersize"
216     * (128MB) in size. This buffer splits the edits based on regions. There are
217     * "hbase.region.replica.replication.writer.threads"(default 3) writer threads which pick
218     * largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). The
219     * SinkWriter in this case will send the wal edits to all secondary region replicas in parallel
220     * via a retrying rpc call. EntryBuffers guarantees that while a buffer is being written to the
221     * sink, another buffer for the same region will not be made available to writers ensuring
222     * regions edits are not replayed out of order. The replicate() call won't return until all the
223     * buffers are sent and ack'd by the sinks so that the replication can assume all edits are
224     * persisted. We may be able to do a better pipelining between the replication thread and output
225     * sinks later if it becomes a bottleneck.
226     */
227
228    while (this.isRunning()) {
229      try {
230        for (Entry entry : replicateContext.getEntries()) {
231          entryBuffers.appendEntry(entry);
232        }
233        outputSink.flush(); // make sure everything is flushed
234        ctx.getMetrics().incrLogEditsFiltered(outputSink.getSkippedEditsCounter().getAndSet(0));
235        return true;
236      } catch (InterruptedException e) {
237        Thread.currentThread().interrupt();
238        return false;
239      } catch (IOException e) {
240        LOG.warn(
241          "Received IOException while trying to replicate" + StringUtils.stringifyException(e));
242        outputSink.restartWriterThreadsIfNeeded();
243      }
244    }
245
246    return false;
247  }
248
249  @Override
250  public boolean canReplicateToSameCluster() {
251    return true;
252  }
253
254  @Override
255  protected WALEntryFilter getScopeWALEntryFilter() {
256    // we do not care about scope. We replicate everything.
257    return null;
258  }
259
260  static class RegionReplicaOutputSink extends OutputSink {
261    private final RegionReplicaSinkWriter sinkWriter;
262    private final TableDescriptors tableDescriptors;
263    private final Cache<TableName, Boolean> memstoreReplicationEnabled;
264
265    public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
266      EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters,
267      int operationTimeout) {
268      super(controller, entryBuffers, numWriters);
269      this.sinkWriter =
270        new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
271      this.tableDescriptors = tableDescriptors;
272
273      // A cache for the table "memstore replication enabled" flag.
274      // It has a default expiry of 5 sec. This means that if the table is altered
275      // with a different flag value, we might miss to replicate for that amount of
276      // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
277      int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
278        .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
279      this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
280        .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
281        .initialCapacity(10).maximumSize(1000).build();
282    }
283
284    @Override
285    public void append(RegionEntryBuffer buffer) throws IOException {
286      List<Entry> entries = buffer.getEntries();
287
288      if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
289        return;
290      }
291
292      // meta edits (e.g. flush) are always replicated.
293      // data edits (e.g. put) are replicated if the table requires them.
294      if (!requiresReplication(buffer.getTableName(), entries)) {
295        return;
296      }
297
298      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
299        CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
300    }
301
302    void flush() throws IOException {
303      // nothing much to do for now. Wait for the Writer threads to finish up
304      // append()'ing the data.
305      entryBuffers.waitUntilDrained();
306    }
307
308    @Override
309    public boolean keepRegionEvent(Entry entry) {
310      return true;
311    }
312
313    @Override
314    public List<Path> close() throws IOException {
315      finishWriterThreads(true);
316      return null;
317    }
318
319    @Override
320    public Map<String, Long> getOutputCounts() {
321      return null; // only used in tests
322    }
323
324    @Override
325    public int getNumberOfRecoveredRegions() {
326      return 0;
327    }
328
329    AtomicLong getSkippedEditsCounter() {
330      return totalSkippedEdits;
331    }
332
333    /**
334     * returns true if the specified entry must be replicated. We should always replicate meta
335     * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
336     * memstore.
337     */
338    private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
339      throws IOException {
340      // unit-tests may not the TableDescriptors, bypass the check and always replicate
341      if (tableDescriptors == null) return true;
342
343      Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
344      if (requiresReplication == null) {
345        // check if the table requires memstore replication
346        // some unit-test drop the table, so we should do a bypass check and always replicate.
347        TableDescriptor htd = tableDescriptors.get(tableName);
348        requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
349        memstoreReplicationEnabled.put(tableName, requiresReplication);
350      }
351
352      // if memstore replication is not required, check the entries.
353      // meta edits (e.g. flush) must be always replicated.
354      if (!requiresReplication) {
355        int skipEdits = 0;
356        java.util.Iterator<Entry> it = entries.iterator();
357        while (it.hasNext()) {
358          Entry entry = it.next();
359          if (entry.getEdit().isMetaEdit()) {
360            requiresReplication = true;
361          } else {
362            it.remove();
363            skipEdits++;
364          }
365        }
366        totalSkippedEdits.addAndGet(skipEdits);
367      }
368      return requiresReplication;
369    }
370
371    @Override
372    protected int getNumOpenWriters() {
373      // TODO Auto-generated method stub
374      return 0;
375    }
376  }
377
378  static class RegionReplicaSinkWriter {
379    RegionReplicaOutputSink sink;
380    ClusterConnection connection;
381    RpcControllerFactory rpcControllerFactory;
382    RpcRetryingCallerFactory rpcRetryingCallerFactory;
383    int operationTimeout;
384    ExecutorService pool;
385    Cache<TableName, Boolean> disabledAndDroppedTables;
386    TableDescriptors tableDescriptors;
387
388    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
389      ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
390      this.sink = sink;
391      this.connection = connection;
392      this.operationTimeout = operationTimeout;
393      this.rpcRetryingCallerFactory =
394        RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
395          connection.getConnectionConfiguration(), connection.getConnectionMetrics());
396      this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
397      this.pool = pool;
398      this.tableDescriptors = tableDescriptors;
399
400      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
401        .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
402      // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
403      // table is created again with the same name, we might miss to replicate for that amount of
404      // time. But this cache prevents overloading meta requests for every edit from a deleted file.
405      disabledAndDroppedTables = CacheBuilder.newBuilder()
406        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
407        .maximumSize(1000).build();
408    }
409
410    public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
411      List<Entry> entries) throws IOException {
412
413      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
414        if (LOG.isTraceEnabled()) {
415          LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
416            + " is cached as a disabled or dropped table");
417          for (Entry entry : entries) {
418            LOG.trace("Skipping : " + entry);
419          }
420        }
421        sink.getSkippedEditsCounter().addAndGet(entries.size());
422        return;
423      }
424
425      // If the table is disabled or dropped, we should not replay the entries, and we can skip
426      // replaying them. However, we might not know whether the table is disabled until we
427      // invalidate the cache and check from meta
428      RegionLocations locations = null;
429      boolean useCache = true;
430      while (true) {
431        // get the replicas of the primary region
432        try {
433          locations =
434            RegionReplicaReplayCallable.getRegionLocations(connection, tableName, row, useCache, 0);
435          if (locations == null) {
436            throw new HBaseIOException(
437              "Cannot locate locations for " + tableName + ", row:" + Bytes.toStringBinary(row));
438          }
439          // Replicas can take a while to come online. The cache may have only the primary. If we
440          // keep going to the cache, we will not learn of the replicas and their locations after
441          // they come online.
442          if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) {
443            if (tableDescriptors.get(tableName).getRegionReplication() > 1) {
444              // Make an obnoxious log here. See how bad this issue is. Add a timer if happening
445              // too much.
446              LOG.info("Skipping location cache; only one location found for {}", tableName);
447              useCache = false;
448              continue;
449            }
450          }
451        } catch (TableNotFoundException e) {
452          if (LOG.isTraceEnabled()) {
453            LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
454              + " is dropped. Adding table to cache.");
455            for (Entry entry : entries) {
456              LOG.trace("Skipping : " + entry);
457            }
458          }
459          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
460          // skip this entry
461          sink.getSkippedEditsCounter().addAndGet(entries.size());
462          return;
463        }
464
465        // check whether we should still replay this entry. If the regions are changed, or the
466        // entry is not coming from the primary region, filter it out.
467        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
468        if (
469          !Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)
470        ) {
471          if (useCache) {
472            useCache = false;
473            continue; // this will retry location lookup
474          }
475          if (LOG.isTraceEnabled()) {
476            LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
477              + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
478              + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
479              + " from WALEdit");
480            for (Entry entry : entries) {
481              LOG.trace("Skipping : " + entry);
482            }
483          }
484          sink.getSkippedEditsCounter().addAndGet(entries.size());
485          return;
486        }
487        break;
488      }
489
490      if (locations.size() == 1) {
491        return;
492      }
493
494      ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
495
496      // All passed entries should belong to one region because it is coming from the EntryBuffers
497      // split per region. But the regions might split and merge (unlike log recovery case).
498      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
499        HRegionLocation location = locations.getRegionLocation(replicaId);
500        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
501          RegionInfo regionInfo = location == null
502            ? RegionReplicaUtil.getRegionInfoForReplica(
503              locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
504            : location.getRegionInfo();
505          RegionReplicaReplayCallable callable =
506            new RegionReplicaReplayCallable(connection, rpcControllerFactory, tableName, location,
507              regionInfo, row, entries, sink.getSkippedEditsCounter());
508          Future<ReplicateWALEntryResponse> task = pool.submit(
509            new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
510          tasks.add(task);
511        }
512      }
513
514      boolean tasksCancelled = false;
515      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
516        try {
517          tasks.get(replicaId).get();
518        } catch (InterruptedException e) {
519          throw new InterruptedIOException(e.getMessage());
520        } catch (ExecutionException e) {
521          Throwable cause = e.getCause();
522          boolean canBeSkipped = false;
523          if (cause instanceof IOException) {
524            // The table can be disabled or dropped at this time. For disabled tables, we have no
525            // cheap mechanism to detect this case because meta does not contain this information.
526            // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
527            // RPC. So instead we start the replay RPC with retries and check whether the table is
528            // dropped or disabled which might cause SocketTimeoutException, or
529            // RetriesExhaustedException or similar if we get IOE.
530            if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
531              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
532              canBeSkipped = true;
533            } else if (tableDescriptors != null) {
534              TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
535              if (
536                tableDescriptor != null
537                  // (replicaId + 1) as no task is added for primary replica for replication
538                  && tableDescriptor.getRegionReplication() <= (replicaId + 1)
539              ) {
540                canBeSkipped = true;
541              }
542            }
543            if (canBeSkipped) {
544              if (LOG.isTraceEnabled()) {
545                LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
546                  + " because received exception for dropped or disabled table", cause);
547                for (Entry entry : entries) {
548                  LOG.trace("Skipping : " + entry);
549                }
550              }
551              if (!tasksCancelled) {
552                sink.getSkippedEditsCounter().addAndGet(entries.size());
553                tasksCancelled = true; // so that we do not add to skipped counter again
554              }
555              continue;
556            }
557
558            // otherwise rethrow
559            throw (IOException) cause;
560          }
561          // unexpected exception
562          throw new IOException(cause);
563        }
564      }
565    }
566  }
567
568  static class RetryingRpcCallable<V> implements Callable<V> {
569    RpcRetryingCallerFactory factory;
570    RetryingCallable<V> callable;
571    int timeout;
572
573    public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
574      int timeout) {
575      this.factory = factory;
576      this.callable = callable;
577      this.timeout = timeout;
578    }
579
580    @Override
581    public V call() throws Exception {
582      return factory.<V> newCaller().callWithRetries(callable, timeout);
583    }
584  }
585
586  /**
587   * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
588   * the entry if the region boundaries have changed or the region is gone.
589   */
590  static class RegionReplicaReplayCallable
591    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
592    private final List<Entry> entries;
593    private final byte[] initialEncodedRegionName;
594    private final AtomicLong skippedEntries;
595
596    public RegionReplicaReplayCallable(ClusterConnection connection,
597      RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation location,
598      RegionInfo regionInfo, byte[] row, List<Entry> entries, AtomicLong skippedEntries) {
599      super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
600      this.entries = entries;
601      this.skippedEntries = skippedEntries;
602      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
603    }
604
605    @Override
606    public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
607      // Check whether we should still replay this entry. If the regions are changed, or the
608      // entry is not coming form the primary region, filter it out because we do not need it.
609      // Regions can change because of (1) region split (2) region merge (3) table recreated
610      boolean skip = false;
611      if (
612        !Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName)
613      ) {
614        skip = true;
615      }
616      if (!this.entries.isEmpty() && !skip) {
617        Entry[] entriesArray = new Entry[this.entries.size()];
618        entriesArray = this.entries.toArray(entriesArray);
619
620        // set the region name for the target region replica
621        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
622          ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray,
623            location.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
624        controller.setCellScanner(p.getSecond());
625        return stub.replay(controller, p.getFirst());
626      }
627
628      if (skip) {
629        if (LOG.isTraceEnabled()) {
630          LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
631            + " because located region " + location.getRegionInfo().getEncodedName()
632            + " is different than the original region "
633            + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
634          for (Entry entry : entries) {
635            LOG.trace("Skipping : " + entry);
636          }
637        }
638        skippedEntries.addAndGet(entries.size());
639      }
640      return ReplicateWALEntryResponse.newBuilder().build();
641    }
642  }
643}