001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WALEdit;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
048
049/**
050 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
051 * onto a queue
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055class ReplicationSourceWALReader extends Thread {
056  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
057
058  private final ReplicationSourceLogQueue logQueue;
059  private final FileSystem fs;
060  private final Configuration conf;
061  private final WALEntryFilter filter;
062  private final ReplicationSource source;
063
064  @InterfaceAudience.Private
065  final BlockingQueue<WALEntryBatch> entryBatchQueue;
066  // max (heap) size of each batch - multiply by number of batches in queue to get total
067  private final long replicationBatchSizeCapacity;
068  // max count of each batch - multiply by number of batches in queue to get total
069  private final int replicationBatchCountCapacity;
070  // position in the WAL to start reading at
071  private long currentPosition;
072  private final long sleepForRetries;
073  private final int maxRetriesMultiplier;
074
075  // Indicates whether this particular worker is running
076  private boolean isReaderRunning = true;
077  private final String walGroupId;
078
079  AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
080
081  /**
082   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
083   * entries, and puts them on a batch queue.
084   * @param fs            the files system to use
085   * @param conf          configuration to use
086   * @param logQueue      The WAL queue to read off of
087   * @param startPosition position in the first WAL to start reading from
088   * @param filter        The filter to use while reading
089   * @param source        replication source
090   */
091  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
092    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
093    ReplicationSource source, String walGroupId) {
094    this.logQueue = logQueue;
095    this.currentPosition = startPosition;
096    this.fs = fs;
097    this.conf = conf;
098    this.filter = filter;
099    this.source = source;
100    this.replicationBatchSizeCapacity =
101      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
102    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
103    // memory used will be batchSizeCapacity * (nb.batches + 1)
104    // the +1 is for the current thread reading before placing onto the queue
105    int batchCount = conf.getInt("replication.source.nb.batches", 1);
106    // 1 second
107    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
108    // 5 minutes @ 1 sec per
109    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
110    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
111    this.walGroupId = walGroupId;
112    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
113      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
114      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
115      + ", replicationBatchQueueCapacity=" + batchCount);
116  }
117
118  private void replicationDone() throws InterruptedException {
119    // we're done with current queue, either this is a recovered queue, or it is the special
120    // group for a sync replication peer and the peer has been transited to DA or S state.
121    LOG.debug("Stopping the replication source wal reader");
122    setReaderRunning(false);
123    // shuts down shipper thread immediately
124    entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
125  }
126
127  protected final int sleep(int sleepMultiplier) {
128    if (sleepMultiplier < maxRetriesMultiplier) {
129      sleepMultiplier++;
130    }
131    Threads.sleep(sleepForRetries * sleepMultiplier);
132    return sleepMultiplier;
133  }
134
135  @Override
136  public void run() {
137    int sleepMultiplier = 1;
138    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
139      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
140        source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
141        while (isReaderRunning()) { // loop here to keep reusing stream while we can
142          if (!source.isPeerEnabled()) {
143            waitingPeerEnabled.set(true);
144            Threads.sleep(sleepForRetries);
145            continue;
146          } else {
147            waitingPeerEnabled.set(false);
148          }
149          if (!checkBufferQuota()) {
150            continue;
151          }
152          Path currentPath = entryStream.getCurrentPath();
153          WALEntryStream.HasNext hasNext = entryStream.hasNext();
154          if (hasNext == WALEntryStream.HasNext.NO) {
155            replicationDone();
156            return;
157          }
158          // first, check if we have switched a file, if so, we need to manually add an EOF entry
159          // batch to the queue
160          if (currentPath != null && switched(entryStream, currentPath)) {
161            entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
162            continue;
163          }
164          if (hasNext == WALEntryStream.HasNext.RETRY) {
165            // sleep and retry
166            sleepMultiplier = sleep(sleepMultiplier);
167            continue;
168          }
169          if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
170            // retry immediately, this usually means we have switched a file
171            continue;
172          }
173          // below are all for hasNext == YES
174          WALEntryBatch batch = createBatch(entryStream);
175          boolean successAddToQueue = false;
176          try {
177            readWALEntries(entryStream, batch);
178            currentPosition = entryStream.getPosition();
179            // need to propagate the batch even it has no entries since it may carry the last
180            // sequence id information for serial replication.
181            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
182            entryBatchQueue.put(batch);
183            successAddToQueue = true;
184            sleepMultiplier = 1;
185          } finally {
186            if (!successAddToQueue) {
187              // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
188              // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
189              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
190              this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
191            }
192          }
193        }
194      } catch (WALEntryFilterRetryableException e) {
195        // here we have to recreate the WALEntryStream, as when filtering, we have already called
196        // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
197        // just considers everything is fine,that's why the catch block is not in the inner block
198        LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
199        sleepMultiplier = sleep(sleepMultiplier);
200      } catch (InterruptedException e) {
201        // this usually means we want to quit
202        LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
203          e);
204        Thread.currentThread().interrupt();
205      }
206    }
207  }
208
209  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
210  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
211    WALEdit edit = entry.getEdit();
212    if (edit == null || edit.isEmpty()) {
213      LOG.trace("Edit null or empty for entry {} ", entry);
214      return false;
215    }
216    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
217      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
218    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
219    long entrySize = getEntrySizeIncludeBulkLoad(entry);
220    batch.addEntry(entry, entrySize);
221    updateBatchStats(batch, entry, entrySize);
222    boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
223
224    // Stop if too many entries or too big
225    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
226      || batch.getNbEntries() >= replicationBatchCountCapacity;
227  }
228
229  protected static final boolean switched(WALEntryStream entryStream, Path path) {
230    Path newPath = entryStream.getCurrentPath();
231    return newPath == null || !path.getName().equals(newPath.getName());
232  }
233
234  // We need to get the WALEntryBatch from the caller so we can add entries in there
235  // This is required in case there is any exception in while reading entries
236  // we do not want to loss the existing entries in the batch
237  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
238    throws InterruptedException {
239    Path currentPath = entryStream.getCurrentPath();
240    for (;;) {
241      Entry entry = entryStream.next();
242      batch.setLastWalPosition(entryStream.getPosition());
243      entry = filterEntry(entry);
244      if (entry != null) {
245        if (addEntryToBatch(batch, entry)) {
246          break;
247        }
248      }
249      WALEntryStream.HasNext hasNext = entryStream.hasNext();
250      // always return if we have switched to a new file
251      if (switched(entryStream, currentPath)) {
252        batch.setEndOfFile(true);
253        break;
254      }
255      if (hasNext != WALEntryStream.HasNext.YES) {
256        // For hasNext other than YES, it is OK to just retry.
257        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
258        // return NO again when you call the method next time, so it is OK to just return here and
259        // let the loop in the upper layer to call hasNext again.
260        break;
261      }
262    }
263  }
264
265  public Path getCurrentPath() {
266    // if we've read some WAL entries, get the Path we read from
267    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
268    if (batchQueueHead != null) {
269      return batchQueueHead.getLastWalPath();
270    }
271    // otherwise, we must be currently reading from the head of the log queue
272    return logQueue.getQueue(walGroupId).peek();
273  }
274
275  // returns false if we've already exceeded the global quota
276  private boolean checkBufferQuota() {
277    // try not to go over total quota
278    if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
279      Threads.sleep(sleepForRetries);
280      return false;
281    }
282    return true;
283  }
284
285  private WALEntryBatch createBatch(WALEntryStream entryStream) {
286    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
287  }
288
289  protected final Entry filterEntry(Entry entry) {
290    // Always replicate if this edit is Replication Marker edit.
291    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
292      return entry;
293    }
294    Entry filtered = filter.filter(entry);
295    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
296      LOG.trace("Filtered entry for replication: {}", entry);
297      source.getSourceMetrics().incrLogEditsFiltered();
298    }
299    return filtered;
300  }
301
302  /**
303   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
304   * batch to become available
305   * @return A batch of entries, along with the position in the log after reading the batch
306   * @throws InterruptedException if interrupted while waiting
307   */
308  public WALEntryBatch take() throws InterruptedException {
309    return entryBatchQueue.take();
310  }
311
312  public WALEntryBatch poll(long timeout) throws InterruptedException {
313    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
314  }
315
316  private long getEntrySizeIncludeBulkLoad(Entry entry) {
317    WALEdit edit = entry.getEdit();
318    return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
319  }
320
321  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
322    WALEdit edit = entry.getEdit();
323    batch.incrementHeapSize(entrySize);
324    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
325    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
326    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
327  }
328
329  /**
330   * Count the number of different row keys in the given edit because of mini-batching. We assume
331   * that there's at least one Cell in the WALEdit.
332   * @param edit edit to count row keys from
333   * @return number of different row keys and HFiles
334   */
335  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
336    List<Cell> cells = edit.getCells();
337    int distinctRowKeys = 1;
338    int totalHFileEntries = 0;
339    Cell lastCell = cells.get(0);
340
341    int totalCells = edit.size();
342    for (int i = 0; i < totalCells; i++) {
343      // Count HFiles to be replicated
344      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
345        try {
346          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
347          List<StoreDescriptor> stores = bld.getStoresList();
348          int totalStores = stores.size();
349          for (int j = 0; j < totalStores; j++) {
350            totalHFileEntries += stores.get(j).getStoreFileList().size();
351          }
352        } catch (IOException e) {
353          LOG.error("Failed to deserialize bulk load entry from wal edit. "
354            + "Then its hfiles count will not be added into metric.", e);
355        }
356      }
357
358      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
359        distinctRowKeys++;
360      }
361      lastCell = cells.get(i);
362    }
363
364    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
365    return result;
366  }
367
368  /**
369   * Calculate the total size of all the store files
370   * @param edit edit to count row keys from
371   * @return the total size of the store files
372   */
373  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
374    List<Cell> cells = edit.getCells();
375    int totalStoreFilesSize = 0;
376
377    int totalCells = edit.size();
378    for (int i = 0; i < totalCells; i++) {
379      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
380        try {
381          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
382          List<StoreDescriptor> stores = bld.getStoresList();
383          int totalStores = stores.size();
384          for (int j = 0; j < totalStores; j++) {
385            totalStoreFilesSize =
386              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
387          }
388        } catch (IOException e) {
389          LOG.error("Failed to deserialize bulk load entry from wal edit. "
390            + "Size of HFiles part of cell will not be considered in replication "
391            + "request size calculation.", e);
392        }
393      }
394    }
395    return totalStoreFilesSize;
396  }
397
398  /*
399   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
400   * cell's value.
401   */
402  private void updateReplicationMarkerEdit(Entry entry, long offset) {
403    WALEdit edit = entry.getEdit();
404    // Return early if it is not ReplicationMarker edit.
405    if (!WALEdit.isReplicationMarkerEdit(edit)) {
406      return;
407    }
408    List<Cell> cells = edit.getCells();
409    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
410    Cell cell = cells.get(0);
411    // Create a descriptor with region_server_name, wal_name and offset
412    WALProtos.ReplicationMarkerDescriptor.Builder builder =
413      WALProtos.ReplicationMarkerDescriptor.newBuilder();
414    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
415    builder.setWalName(getCurrentPath().getName());
416    builder.setOffset(offset);
417    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
418
419    // Create a new KeyValue
420    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
421      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
422    ArrayList<Cell> newCells = new ArrayList<>();
423    newCells.add(kv);
424    // Update edit with new cell.
425    edit.setCells(newCells);
426  }
427
428  /** Returns whether the reader thread is running */
429  public boolean isReaderRunning() {
430    return isReaderRunning && !isInterrupted();
431  }
432
433  /**
434   * @param readerRunning the readerRunning to set
435   */
436  public void setReaderRunning(boolean readerRunning) {
437    this.isReaderRunning = readerRunning;
438  }
439
440  private ReplicationSourceManager getSourceManager() {
441    return this.source.getSourceManager();
442  }
443}