001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.ExtendedCell;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WALEdit;
038import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
049
050/**
051 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
052 * onto a queue
053 */
054@InterfaceAudience.Private
055@InterfaceStability.Evolving
056class ReplicationSourceWALReader extends Thread {
057  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
058
059  private final ReplicationSourceLogQueue logQueue;
060  private final FileSystem fs;
061  private final Configuration conf;
062  private final WALEntryFilter filter;
063  private final ReplicationSource source;
064
065  @InterfaceAudience.Private
066  final BlockingQueue<WALEntryBatch> entryBatchQueue;
067  // max (heap) size of each batch - multiply by number of batches in queue to get total
068  private final long replicationBatchSizeCapacity;
069  // max count of each batch - multiply by number of batches in queue to get total
070  private final int replicationBatchCountCapacity;
071  // position in the WAL to start reading at
072  private long currentPosition;
073  private final long sleepForRetries;
074  private final int maxRetriesMultiplier;
075
076  // Indicates whether this particular worker is running
077  private boolean isReaderRunning = true;
078  private final String walGroupId;
079
080  /**
081   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
082   * entries, and puts them on a batch queue.
083   * @param fs            the files system to use
084   * @param conf          configuration to use
085   * @param logQueue      The WAL queue to read off of
086   * @param startPosition position in the first WAL to start reading from
087   * @param filter        The filter to use while reading
088   * @param source        replication source
089   */
090  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
091    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
092    ReplicationSource source, String walGroupId) {
093    this.logQueue = logQueue;
094    this.currentPosition = startPosition;
095    this.fs = fs;
096    this.conf = conf;
097    this.filter = filter;
098    this.source = source;
099    this.replicationBatchSizeCapacity =
100      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
101    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
102    // memory used will be batchSizeCapacity * (nb.batches + 1)
103    // the +1 is for the current thread reading before placing onto the queue
104    int batchCount = conf.getInt("replication.source.nb.batches", 1);
105    // 1 second
106    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
107    // 5 minutes @ 1 sec per
108    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
109    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
110    this.walGroupId = walGroupId;
111    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
112      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
113      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
114      + ", replicationBatchQueueCapacity=" + batchCount);
115  }
116
117  private void replicationDone() throws InterruptedException {
118    // we're done with current queue, either this is a recovered queue, or it is the special
119    // group for a sync replication peer and the peer has been transited to DA or S state.
120    LOG.debug("Stopping the replication source wal reader");
121    setReaderRunning(false);
122    // shuts down shipper thread immediately
123    entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
124  }
125
126  protected final int sleep(int sleepMultiplier) {
127    if (sleepMultiplier < maxRetriesMultiplier) {
128      sleepMultiplier++;
129    }
130    Threads.sleep(sleepForRetries * sleepMultiplier);
131    return sleepMultiplier;
132  }
133
134  @Override
135  public void run() {
136    int sleepMultiplier = 1;
137    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
138      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
139        source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
140        while (isReaderRunning()) { // loop here to keep reusing stream while we can
141          if (!source.isPeerEnabled()) {
142            Threads.sleep(sleepForRetries);
143            continue;
144          }
145          if (!checkBufferQuota()) {
146            continue;
147          }
148          Path currentPath = entryStream.getCurrentPath();
149          WALEntryStream.HasNext hasNext = entryStream.hasNext();
150          if (hasNext == WALEntryStream.HasNext.NO) {
151            replicationDone();
152            return;
153          }
154          // first, check if we have switched a file, if so, we need to manually add an EOF entry
155          // batch to the queue
156          if (currentPath != null && switched(entryStream, currentPath)) {
157            entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
158            continue;
159          }
160          if (hasNext == WALEntryStream.HasNext.RETRY) {
161            // sleep and retry
162            sleepMultiplier = sleep(sleepMultiplier);
163            continue;
164          }
165          if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
166            // retry immediately, this usually means we have switched a file
167            continue;
168          }
169          // below are all for hasNext == YES
170          WALEntryBatch batch = createBatch(entryStream);
171          boolean successAddToQueue = false;
172          try {
173            readWALEntries(entryStream, batch);
174            currentPosition = entryStream.getPosition();
175            // need to propagate the batch even it has no entries since it may carry the last
176            // sequence id information for serial replication.
177            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
178            entryBatchQueue.put(batch);
179            successAddToQueue = true;
180            sleepMultiplier = 1;
181          } finally {
182            if (!successAddToQueue) {
183              // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
184              // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
185              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
186              this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
187            }
188          }
189        }
190      } catch (WALEntryFilterRetryableException e) {
191        // here we have to recreate the WALEntryStream, as when filtering, we have already called
192        // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
193        // just considers everything is fine,that's why the catch block is not in the inner block
194        LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
195        sleepMultiplier = sleep(sleepMultiplier);
196      } catch (InterruptedException e) {
197        // this usually means we want to quit
198        LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
199          e);
200        Thread.currentThread().interrupt();
201      }
202    }
203  }
204
205  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
206  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
207    WALEdit edit = entry.getEdit();
208    if (edit == null || edit.isEmpty()) {
209      LOG.trace("Edit null or empty for entry {} ", entry);
210      return false;
211    }
212    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
213      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
214    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
215    long entrySize = getEntrySizeIncludeBulkLoad(entry);
216    batch.addEntry(entry, entrySize);
217    updateBatchStats(batch, entry, entrySize);
218    boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
219
220    // Stop if too many entries or too big
221    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
222      || batch.getNbEntries() >= replicationBatchCountCapacity;
223  }
224
225  protected static final boolean switched(WALEntryStream entryStream, Path path) {
226    Path newPath = entryStream.getCurrentPath();
227    return newPath == null || !path.getName().equals(newPath.getName());
228  }
229
230  // We need to get the WALEntryBatch from the caller so we can add entries in there
231  // This is required in case there is any exception in while reading entries
232  // we do not want to loss the existing entries in the batch
233  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
234    throws InterruptedException {
235    Path currentPath = entryStream.getCurrentPath();
236    for (;;) {
237      Entry entry = entryStream.next();
238      batch.setLastWalPosition(entryStream.getPosition());
239      entry = filterEntry(entry);
240      if (entry != null) {
241        if (addEntryToBatch(batch, entry)) {
242          break;
243        }
244      }
245      WALEntryStream.HasNext hasNext = entryStream.hasNext();
246      // always return if we have switched to a new file
247      if (switched(entryStream, currentPath)) {
248        batch.setEndOfFile(true);
249        break;
250      }
251      if (hasNext != WALEntryStream.HasNext.YES) {
252        // For hasNext other than YES, it is OK to just retry.
253        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
254        // return NO again when you call the method next time, so it is OK to just return here and
255        // let the loop in the upper layer to call hasNext again.
256        break;
257      }
258    }
259  }
260
261  public Path getCurrentPath() {
262    // if we've read some WAL entries, get the Path we read from
263    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
264    if (batchQueueHead != null) {
265      return batchQueueHead.getLastWalPath();
266    }
267    // otherwise, we must be currently reading from the head of the log queue
268    return logQueue.getQueue(walGroupId).peek();
269  }
270
271  // returns false if we've already exceeded the global quota
272  private boolean checkBufferQuota() {
273    // try not to go over total quota
274    if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
275      Threads.sleep(sleepForRetries);
276      return false;
277    }
278    return true;
279  }
280
281  private WALEntryBatch createBatch(WALEntryStream entryStream) {
282    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
283  }
284
285  protected final Entry filterEntry(Entry entry) {
286    // Always replicate if this edit is Replication Marker edit.
287    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
288      return entry;
289    }
290    Entry filtered = filter.filter(entry);
291    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
292      LOG.trace("Filtered entry for replication: {}", entry);
293      source.getSourceMetrics().incrLogEditsFiltered();
294    }
295    return filtered;
296  }
297
298  /**
299   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
300   * batch to become available
301   * @return A batch of entries, along with the position in the log after reading the batch
302   * @throws InterruptedException if interrupted while waiting
303   */
304  public WALEntryBatch take() throws InterruptedException {
305    return entryBatchQueue.take();
306  }
307
308  public WALEntryBatch poll(long timeout) throws InterruptedException {
309    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
310  }
311
312  private long getEntrySizeIncludeBulkLoad(Entry entry) {
313    WALEdit edit = entry.getEdit();
314    return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
315  }
316
317  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
318    WALEdit edit = entry.getEdit();
319    batch.incrementHeapSize(entrySize);
320    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
321    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
322    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
323  }
324
325  /**
326   * Count the number of different row keys in the given edit because of mini-batching. We assume
327   * that there's at least one Cell in the WALEdit.
328   * @param edit edit to count row keys from
329   * @return number of different row keys and HFiles
330   */
331  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
332    List<Cell> cells = edit.getCells();
333    int distinctRowKeys = 1;
334    int totalHFileEntries = 0;
335    Cell lastCell = cells.get(0);
336
337    int totalCells = edit.size();
338    for (int i = 0; i < totalCells; i++) {
339      // Count HFiles to be replicated
340      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
341        try {
342          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
343          List<StoreDescriptor> stores = bld.getStoresList();
344          int totalStores = stores.size();
345          for (int j = 0; j < totalStores; j++) {
346            totalHFileEntries += stores.get(j).getStoreFileList().size();
347          }
348        } catch (IOException e) {
349          LOG.error("Failed to deserialize bulk load entry from wal edit. "
350            + "Then its hfiles count will not be added into metric.", e);
351        }
352      }
353
354      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
355        distinctRowKeys++;
356      }
357      lastCell = cells.get(i);
358    }
359
360    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
361    return result;
362  }
363
364  /**
365   * Calculate the total size of all the store files
366   * @param edit edit to count row keys from
367   * @return the total size of the store files
368   */
369  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
370    List<Cell> cells = edit.getCells();
371    int totalStoreFilesSize = 0;
372
373    int totalCells = edit.size();
374    for (int i = 0; i < totalCells; i++) {
375      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
376        try {
377          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
378          List<StoreDescriptor> stores = bld.getStoresList();
379          int totalStores = stores.size();
380          for (int j = 0; j < totalStores; j++) {
381            totalStoreFilesSize =
382              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
383          }
384        } catch (IOException e) {
385          LOG.error("Failed to deserialize bulk load entry from wal edit. "
386            + "Size of HFiles part of cell will not be considered in replication "
387            + "request size calculation.", e);
388        }
389      }
390    }
391    return totalStoreFilesSize;
392  }
393
394  /*
395   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
396   * cell's value.
397   */
398  private void updateReplicationMarkerEdit(Entry entry, long offset) {
399    WALEdit edit = entry.getEdit();
400    // Return early if it is not ReplicationMarker edit.
401    if (!WALEdit.isReplicationMarkerEdit(edit)) {
402      return;
403    }
404    List<Cell> cells = edit.getCells();
405    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
406    Cell cell = cells.get(0);
407    // Create a descriptor with region_server_name, wal_name and offset
408    WALProtos.ReplicationMarkerDescriptor.Builder builder =
409      WALProtos.ReplicationMarkerDescriptor.newBuilder();
410    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
411    builder.setWalName(getCurrentPath().getName());
412    builder.setOffset(offset);
413    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
414
415    // Create a new KeyValue
416    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
417      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
418    ArrayList<ExtendedCell> newCells = new ArrayList<>();
419    newCells.add(kv);
420    // Update edit with new cell.
421    WALEditInternalHelper.setExtendedCells(edit, newCells);
422  }
423
424  /** Returns whether the reader thread is running */
425  public boolean isReaderRunning() {
426    return isReaderRunning && !isInterrupted();
427  }
428
429  /**
430   * @param readerRunning the readerRunning to set
431   */
432  public void setReaderRunning(boolean readerRunning) {
433    this.isReaderRunning = readerRunning;
434  }
435
436  private ReplicationSourceManager getSourceManager() {
437    return this.source.getSourceManager();
438  }
439}