001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021
022import java.io.IOException;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.hadoop.hbase.wal.WAL.Entry;
033import org.apache.hadoop.hbase.wal.WALEdit;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
040
041/**
042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
043 * ReplicationSourceWALReaderThread
044 */
045@InterfaceAudience.Private
046public class ReplicationSourceShipper extends Thread {
047  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
048
049  // Hold the state of a replication worker thread
050  public enum WorkerState {
051    RUNNING,
052    STOPPED,
053    FINISHED, // The worker is done processing a recovered queue
054  }
055
056  private final Configuration conf;
057  protected final String walGroupId;
058  protected final ReplicationSourceLogQueue logQueue;
059  private final ReplicationSource source;
060
061  // Last position in the log that we sent to ZooKeeper
062  // It will be accessed by the stats thread so make it volatile
063  private volatile long currentPosition = -1;
064  // Path of the current log
065  private Path currentPath;
066  // Current state of the worker thread
067  private volatile WorkerState state;
068  protected ReplicationSourceWALReader entryReader;
069
070  // How long should we sleep for each retry
071  protected final long sleepForRetries;
072  // Maximum number of retries before taking bold actions
073  protected final int maxRetriesMultiplier;
074  private final int DEFAULT_TIMEOUT = 20000;
075  private final int getEntriesTimeout;
076  private final int shipEditsTimeout;
077
078  public ReplicationSourceShipper(Configuration conf, String walGroupId,
079    ReplicationSourceLogQueue logQueue, ReplicationSource source) {
080    this.conf = conf;
081    this.walGroupId = walGroupId;
082    this.logQueue = logQueue;
083    this.source = source;
084    // 1 second
085    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
086    // 5 minutes @ 1 sec per
087    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
088    // 20 seconds
089    this.getEntriesTimeout =
090      this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
091    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
092      HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
093  }
094
095  @Override
096  public final void run() {
097    setWorkerState(WorkerState.RUNNING);
098    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
099    // Loop until we close down
100    while (isActive()) {
101      // Sleep until replication is enabled again
102      if (!source.isPeerEnabled()) {
103        // The peer enabled check is in memory, not expensive, so do not need to increase the
104        // sleep interval as it may cause a long lag when we enable the peer.
105        sleepForRetries("Replication is disabled", 1);
106        continue;
107      }
108      try {
109        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
110        LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
111          entryBatch);
112        if (entryBatch == null) {
113          continue;
114        }
115        // the NO_MORE_DATA instance has no path so do not call shipEdits
116        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
117          noMoreData();
118        } else {
119          shipEdits(entryBatch);
120        }
121      } catch (InterruptedException | ReplicationRuntimeException e) {
122        // It is interrupted and needs to quit.
123        LOG.warn("Interrupted while waiting for next replication entry batch", e);
124        Thread.currentThread().interrupt();
125      }
126    }
127    // If the worker exits run loop without finishing its task, mark it as stopped.
128    if (!isFinished()) {
129      setWorkerState(WorkerState.STOPPED);
130    } else {
131      source.workerThreads.remove(this.walGroupId);
132      postFinish();
133    }
134  }
135
136  // To be implemented by recovered shipper
137  protected void noMoreData() {
138  }
139
140  // To be implemented by recovered shipper
141  protected void postFinish() {
142  }
143
144  /**
145   * Do the shipping logic
146   */
147  private void shipEdits(WALEntryBatch entryBatch) {
148    List<Entry> entries = entryBatch.getWalEntries();
149    int sleepMultiplier = 0;
150    if (entries.isEmpty()) {
151      updateLogPosition(entryBatch);
152      return;
153    }
154    int currentSize = (int) entryBatch.getHeapSize();
155    source.getSourceMetrics()
156      .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
157    while (isActive()) {
158      try {
159        try {
160          source.tryThrottle(currentSize);
161        } catch (InterruptedException e) {
162          LOG.debug("Interrupted while sleeping for throttling control");
163          Thread.currentThread().interrupt();
164          // current thread might be interrupted to terminate
165          // directly go back to while() for confirm this
166          continue;
167        }
168        // create replicateContext here, so the entries can be GC'd upon return from this call
169        // stack
170        ReplicationEndpoint.ReplicateContext replicateContext =
171          new ReplicationEndpoint.ReplicateContext();
172        replicateContext.setEntries(entries).setSize(currentSize);
173        replicateContext.setWalGroupId(walGroupId);
174        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
175
176        long startTimeNs = System.nanoTime();
177        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
178        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
179        long endTimeNs = System.nanoTime();
180
181        if (!replicated) {
182          continue;
183        } else {
184          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
185        }
186        // Clean up hfile references
187        for (Entry entry : entries) {
188          cleanUpHFileRefs(entry.getEdit());
189          LOG.trace("shipped entry {}: ", entry);
190        }
191        // Log and clean up WAL logs
192        updateLogPosition(entryBatch);
193
194        // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
195        // this sizeExcludeBulkLoad has to use same calculation that when calling
196        // acquireBufferQuota() in ReplicationSourceWALReader because they maintain
197        // same variable: totalBufferUsed
198        source.postShipEdits(entries, entryBatch.getUsedBufferSize());
199        // FIXME check relationship between wal group and overall
200        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
201          entryBatch.getNbHFiles());
202        source.getSourceMetrics().setAgeOfLastShippedOp(
203          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
204        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
205
206        if (LOG.isTraceEnabled()) {
207          LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(),
208            entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
209        }
210        break;
211      } catch (Exception ex) {
212        source.getSourceMetrics().incrementFailedBatches();
213        LOG.warn("{} threw unknown exception:",
214          source.getReplicationEndpoint().getClass().getName(), ex);
215        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
216          sleepMultiplier++;
217        }
218      }
219    }
220  }
221
222  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
223    String peerId = source.getPeerId();
224    if (peerId.contains("-")) {
225      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
226      // A peerId will not have "-" in its name, see HBASE-11394
227      peerId = peerId.split("-")[0];
228    }
229    List<Cell> cells = edit.getCells();
230    int totalCells = cells.size();
231    for (int i = 0; i < totalCells; i++) {
232      Cell cell = cells.get(i);
233      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
234        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
235        List<StoreDescriptor> stores = bld.getStoresList();
236        int totalStores = stores.size();
237        for (int j = 0; j < totalStores; j++) {
238          List<String> storeFileList = stores.get(j).getStoreFileList();
239          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
240          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
241        }
242      }
243    }
244  }
245
246  private boolean updateLogPosition(WALEntryBatch batch) {
247    boolean updated = false;
248    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
249    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
250    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
251    // position and the file will be removed soon in cleanOldLogs.
252    if (
253      batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
254        || batch.getLastWalPosition() != currentPosition
255    ) {
256      source.logPositionAndCleanOldLogs(batch);
257      updated = true;
258    }
259    // if end of file is true, then we can just skip to the next file in queue.
260    // the only exception is for recovered queue, if we reach the end of the queue, then there will
261    // no more files so here the currentPath may be null.
262    if (batch.isEndOfFile()) {
263      currentPath = entryReader.getCurrentPath();
264      currentPosition = 0L;
265    } else {
266      currentPath = batch.getLastWalPath();
267      currentPosition = batch.getLastWalPosition();
268    }
269    return updated;
270  }
271
272  public void startup(UncaughtExceptionHandler handler) {
273    String name = Thread.currentThread().getName();
274    Threads.setDaemonThreadRunning(this,
275      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
276      handler::uncaughtException);
277  }
278
279  Path getCurrentPath() {
280    return entryReader.getCurrentPath();
281  }
282
283  long getCurrentPosition() {
284    return currentPosition;
285  }
286
287  void setWALReader(ReplicationSourceWALReader entryReader) {
288    this.entryReader = entryReader;
289  }
290
291  long getStartPosition() {
292    return 0;
293  }
294
295  protected boolean isActive() {
296    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
297  }
298
299  protected final void setWorkerState(WorkerState state) {
300    this.state = state;
301  }
302
303  void stopWorker() {
304    setWorkerState(WorkerState.STOPPED);
305  }
306
307  public boolean isFinished() {
308    return state == WorkerState.FINISHED;
309  }
310
311  /**
312   * Do the sleeping logic
313   * @param msg             Why we sleep
314   * @param sleepMultiplier by how many times the default sleeping time is augmented
315   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
316   */
317  public boolean sleepForRetries(String msg, int sleepMultiplier) {
318    try {
319      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
320      Thread.sleep(this.sleepForRetries * sleepMultiplier);
321    } catch (InterruptedException e) {
322      LOG.debug("Interrupted while sleeping between retries");
323      Thread.currentThread().interrupt();
324    }
325    return sleepMultiplier < maxRetriesMultiplier;
326  }
327
328  /**
329   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case
330   * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
331   * manage to ship those because the replication source is being terminated. In that case, it
332   * iterates through the batched entries and decrease the pending entries size from
333   * <code>ReplicationSourceManager.totalBufferUser</code>
334   * <p/>
335   * <b>NOTES</b> 1) This method should only be called upon replication source termination. It
336   * blocks waiting for both shipper and reader threads termination, to make sure no race conditions
337   * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b>
338   * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered
339   * interruption/termination prior to calling this method.
340   */
341  void clearWALEntryBatch() {
342    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
343    while (this.isAlive() || this.entryReader.isAlive()) {
344      try {
345        if (EnvironmentEdgeManager.currentTime() >= timeout) {
346          LOG.warn(
347            "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
348              + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
349            this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
350          return;
351        } else {
352          // Wait both shipper and reader threads to stop
353          Thread.sleep(this.sleepForRetries);
354        }
355      } catch (InterruptedException e) {
356        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
357          + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
358        return;
359      }
360    }
361    long totalReleasedBytes = 0;
362    while (true) {
363      WALEntryBatch batch = entryReader.entryBatchQueue.poll();
364      if (batch == null) {
365        break;
366      }
367      totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
368    }
369    if (LOG.isTraceEnabled()) {
370      LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
371        totalReleasedBytes);
372    }
373  }
374}