001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.Queue;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.stream.Collectors;
034import org.agrona.collections.IntHashSet;
035import org.apache.commons.lang3.mutable.MutableObject;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.client.AsyncClusterConnection;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionReplicaUtil;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.ipc.ServerCall;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.FutureUtils;
046import org.apache.hadoop.hbase.wal.WAL;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.apache.hadoop.hbase.wal.WALKeyImpl;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055
056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
058
059/**
060 * The class for replicating WAL edits to secondary replicas, one instance per region.
061 */
062@InterfaceAudience.Private
063public class RegionReplicationSink {
064
065  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
066
067  public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
068
069  public static final int RETRIES_NUMBER_DEFAULT = 3;
070
071  public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
072
073  public static final long RPC_TIMEOUT_MS_DEFAULT = 1000;
074
075  public static final String OPERATION_TIMEOUT_MS =
076    "hbase.region.read-replica.sink.operation.timeout.ms";
077
078  public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000;
079
080  // the two options below are for replicating meta edits, as usually a meta edit will trigger a
081  // refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta
082  // edit is more important for fixing inconsistent state so it worth to wait for more time.
083  public static final String META_EDIT_RPC_TIMEOUT_MS =
084    "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
085
086  public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000;
087
088  public static final String META_EDIT_OPERATION_TIMEOUT_MS =
089    "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
090
091  public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000;
092
093  public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
094
095  public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024;
096
097  public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
098
099  public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
100
101  private static final class SinkEntry {
102
103    final WALKeyImpl key;
104
105    final WALEdit edit;
106
107    final ServerCall<?> rpcCall;
108
109    final long size;
110
111    SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
112      this.key = key;
113      this.edit = edit;
114      this.rpcCall = rpcCall;
115      this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
116      if (rpcCall != null) {
117        // increase the reference count to avoid the rpc framework free the memory before we
118        // actually sending them out.
119        rpcCall.retainByWAL();
120      }
121    }
122
123    /**
124     * Should be called regardless of the result of the replicating operation. Unless you still want
125     * to reuse this entry, otherwise you must call this method to release the possible off heap
126     * memories.
127     */
128    void replicated() {
129      if (rpcCall != null) {
130        rpcCall.releaseByWAL();
131      }
132    }
133  }
134
135  private final RegionInfo primary;
136
137  private final TableDescriptor tableDesc;
138
139  // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication.
140  private final int regionReplication;
141
142  private final RegionReplicationBufferManager manager;
143
144  private final RegionReplicationFlushRequester flushRequester;
145
146  private final AsyncClusterConnection conn;
147
148  // used to track the replicas which we failed to replicate edits to them
149  // the key is the replica id, the value is the sequence id of the last failed edit
150  // when we get a flush all request, we will try to remove a replica from this map, the key point
151  // here is the flush sequence number must be greater than the failed sequence id, otherwise we
152  // should not remove the replica from this map
153  private final IntHashSet failedReplicas;
154
155  private final Queue<SinkEntry> entries = new ArrayDeque<>();
156
157  private final int retries;
158
159  private final long rpcTimeoutNs;
160
161  private final long operationTimeoutNs;
162
163  private final long metaEditRpcTimeoutNs;
164
165  private final long metaEditOperationTimeoutNs;
166
167  private final long batchSizeCapacity;
168
169  private final long batchCountCapacity;
170
171  private volatile long pendingSize;
172
173  private long lastFlushedSequenceId;
174
175  private boolean sending;
176
177  private boolean stopping;
178
179  private boolean stopped;
180
181  public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
182    RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
183    Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
184      primary);
185    this.regionReplication = td.getRegionReplication();
186    Preconditions.checkArgument(this.regionReplication > 1,
187      "region replication should be greater than 1 but got %s", this.regionReplication);
188    this.primary = primary;
189    this.tableDesc = td;
190    this.manager = manager;
191    this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester);
192    this.conn = conn;
193    this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
194    this.rpcTimeoutNs =
195      TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
196    this.operationTimeoutNs = TimeUnit.MILLISECONDS
197      .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
198    this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS
199      .toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT));
200    this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
201      conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT));
202    this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
203    this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
204    this.failedReplicas = new IntHashSet(regionReplication - 1);
205  }
206
207  void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) {
208    long maxSequenceId = Long.MIN_VALUE;
209    long toReleaseSize = 0;
210    for (SinkEntry entry : sent) {
211      maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
212      entry.replicated();
213      toReleaseSize += entry.size;
214    }
215    manager.decrease(toReleaseSize);
216    synchronized (entries) {
217      pendingSize -= toReleaseSize;
218      boolean addFailedReplicas = false;
219      for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
220        Integer replicaId = entry.getKey();
221        Throwable error = entry.getValue().getValue();
222        if (error != null) {
223          if (maxSequenceId > lastFlushedSequenceId) {
224            LOG.warn(
225              "Failed to replicate to secondary replica {} for {}, since the max sequence"
226                + " id of sunk entris is {}, which is greater than the last flush SN {},"
227                + " we will stop replicating for a while and trigger a flush",
228              replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
229            failedReplicas.add(replicaId);
230            addFailedReplicas = true;
231          } else {
232            LOG.warn(
233              "Failed to replicate to secondary replica {} for {}, since the max sequence"
234                + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
235                + " we will not stop replicating",
236              replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
237          }
238        }
239      }
240
241      if (addFailedReplicas) {
242        flushRequester.requestFlush(maxSequenceId);
243      }
244      sending = false;
245      if (stopping) {
246        stopped = true;
247        entries.notifyAll();
248        return;
249      }
250      if (!entries.isEmpty()) {
251        send();
252      }
253    }
254  }
255
256  private void send() {
257    // We should check if there are normal replicas first
258    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
259    if (toSendReplicaCount <= 0) {
260      LOG.warn("All replicas {} are failed, exit send....", failedReplicas);
261      return;
262    }
263
264    List<SinkEntry> toSend = new ArrayList<>();
265    long totalSize = 0L;
266    boolean hasMetaEdit = false;
267    for (SinkEntry entry;;) {
268      entry = entries.poll();
269      if (entry == null) {
270        break;
271      }
272      toSend.add(entry);
273      totalSize += entry.size;
274      hasMetaEdit |= entry.edit.isMetaEdit();
275      if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
276        break;
277      }
278    }
279
280    long rpcTimeoutNsToUse;
281    long operationTimeoutNsToUse;
282    if (!hasMetaEdit) {
283      rpcTimeoutNsToUse = rpcTimeoutNs;
284      operationTimeoutNsToUse = operationTimeoutNs;
285    } else {
286      rpcTimeoutNsToUse = metaEditRpcTimeoutNs;
287      operationTimeoutNsToUse = metaEditOperationTimeoutNs;
288    }
289    sending = true;
290    List<WAL.Entry> walEntries =
291      toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
292    AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
293    Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
294    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
295      if (failedReplicas.contains(replicaId)) {
296        continue;
297      }
298      MutableObject<Throwable> error = new MutableObject<>();
299      replica2Error.put(replicaId, error);
300      RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
301      FutureUtils.addListener(
302        conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse),
303        (r, e) -> {
304          error.setValue(e);
305          if (remaining.decrementAndGet() == 0) {
306            onComplete(toSend, replica2Error);
307          }
308        });
309    }
310  }
311
312  private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
313    if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) {
314      // this means the memstore is empty, which means all data before this sequence id are flushed
315      // out, so it equals to a flush all, return true
316      return true;
317    }
318    if (flushDesc.getAction() != FlushAction.START_FLUSH) {
319      return false;
320    }
321    Set<byte[]> storesFlushed =
322      flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
323        .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
324    if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
325      return false;
326    }
327    return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
328  }
329
330  Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
331    if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
332      return Optional.empty();
333    }
334    FlushDescriptor flushDesc;
335    try {
336      flushDesc = WALEdit.getFlushDescriptor(metaCell);
337    } catch (IOException e) {
338      LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
339      return Optional.empty();
340    }
341    if (flushDesc != null && isStartFlushAllStores(flushDesc)) {
342      return Optional.of(flushDesc);
343    } else {
344      return Optional.empty();
345    }
346  }
347
348  private long clearAllEntries() {
349    long toClearSize = 0;
350    for (SinkEntry entry : entries) {
351      toClearSize += entry.size;
352      entry.replicated();
353    }
354    entries.clear();
355    pendingSize -= toClearSize;
356    manager.decrease(toClearSize);
357    return toClearSize;
358  }
359
360  /**
361   * Add this edit to replication queue.
362   * <p/>
363   * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
364   * rpc call has cell scanner, which is off heap.
365   */
366  public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
367    if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
368      // only replicate meta edit if region memstore replication is not enabled
369      return;
370    }
371    synchronized (entries) {
372      if (stopping) {
373        return;
374      }
375      if (edit.isMetaEdit()) {
376        // check whether we flushed all stores, which means we could drop all the previous edits,
377        // and also, recover from the previous failure of some replicas
378        for (Cell metaCell : edit.getCells()) {
379          getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
380            long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
381            lastFlushedSequenceId = flushSequenceNumber;
382            long clearedCount = entries.size();
383            long clearedSize = clearAllEntries();
384            if (LOG.isDebugEnabled()) {
385              LOG.debug(
386                "Got a flush all request with sequence id {}, clear {} pending"
387                  + " entries with size {}, clear failed replicas {}",
388                flushSequenceNumber, clearedCount,
389                StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
390                failedReplicas);
391            }
392            failedReplicas.clear();
393            flushRequester.recordFlush(flushSequenceNumber);
394          });
395        }
396      }
397      if (failedReplicas.size() == regionReplication - 1) {
398        // this means we have marked all the replicas as failed, so just give up here
399        return;
400      }
401      SinkEntry entry = new SinkEntry(key, edit, rpcCall);
402      entries.add(entry);
403      pendingSize += entry.size;
404      if (manager.increase(entry.size)) {
405        if (!sending) {
406          send();
407        }
408      } else {
409        // we have run out of the max pending size, drop all the edits, and mark all replicas as
410        // failed
411        clearAllEntries();
412        for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
413          failedReplicas.add(replicaId);
414        }
415        flushRequester.requestFlush(entry.key.getSequenceId());
416      }
417    }
418  }
419
420  long pendingSize() {
421    return pendingSize;
422  }
423
424  /**
425   * Stop the replication sink.
426   * <p/>
427   * Usually this should only be called when you want to close a region.
428   */
429  public void stop() {
430    synchronized (entries) {
431      stopping = true;
432      clearAllEntries();
433      if (!sending) {
434        stopped = true;
435        entries.notifyAll();
436      }
437    }
438  }
439
440  /**
441   * Make sure that we have finished all the replicating requests.
442   * <p/>
443   * After returning, we can make sure there will be no new replicating requests to secondary
444   * replicas.
445   * <p/>
446   * This is used to keep the replicating order the same with the WAL edit order when writing.
447   */
448  public void waitUntilStopped() throws InterruptedException {
449    synchronized (entries) {
450      while (!stopped) {
451        entries.wait();
452      }
453    }
454  }
455
456  @RestrictedApi(explanation = "Should only be called in tests", link = "",
457      allowedOnPath = ".*/src/test/.*")
458  IntHashSet getFailedReplicas() {
459    synchronized (entries) {
460      return this.failedReplicas;
461    }
462  }
463}