001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.replication.WALEntryFilter; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 048 049/** 050 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 051 * onto a queue 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055class ReplicationSourceWALReader extends Thread { 056 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 057 058 private final ReplicationSourceLogQueue logQueue; 059 private final FileSystem fs; 060 private final Configuration conf; 061 private final WALEntryFilter filter; 062 private final ReplicationSource source; 063 064 @InterfaceAudience.Private 065 final BlockingQueue<WALEntryBatch> entryBatchQueue; 066 // max (heap) size of each batch - multiply by number of batches in queue to get total 067 private final long replicationBatchSizeCapacity; 068 // max count of each batch - multiply by number of batches in queue to get total 069 private final int replicationBatchCountCapacity; 070 // position in the WAL to start reading at 071 private long currentPosition; 072 private final long sleepForRetries; 073 private final int maxRetriesMultiplier; 074 075 // Indicates whether this particular worker is running 076 private boolean isReaderRunning = true; 077 private final String walGroupId; 078 079 AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false); 080 081 /** 082 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 083 * entries, and puts them on a batch queue. 084 * @param fs the files system to use 085 * @param conf configuration to use 086 * @param logQueue The WAL queue to read off of 087 * @param startPosition position in the first WAL to start reading from 088 * @param filter The filter to use while reading 089 * @param source replication source 090 */ 091 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 092 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 093 ReplicationSource source, String walGroupId) { 094 this.logQueue = logQueue; 095 this.currentPosition = startPosition; 096 this.fs = fs; 097 this.conf = conf; 098 this.filter = filter; 099 this.source = source; 100 this.replicationBatchSizeCapacity = 101 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 102 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 103 // memory used will be batchSizeCapacity * (nb.batches + 1) 104 // the +1 is for the current thread reading before placing onto the queue 105 int batchCount = conf.getInt("replication.source.nb.batches", 1); 106 // 1 second 107 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 108 // 5 minutes @ 1 sec per 109 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 110 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 111 this.walGroupId = walGroupId; 112 LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " 113 + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 114 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 115 + ", replicationBatchQueueCapacity=" + batchCount); 116 } 117 118 private void replicationDone() throws InterruptedException { 119 // we're done with current queue, either this is a recovered queue, or it is the special 120 // group for a sync replication peer and the peer has been transited to DA or S state. 121 LOG.debug("Stopping the replication source wal reader"); 122 setReaderRunning(false); 123 // shuts down shipper thread immediately 124 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 125 } 126 127 protected final int sleep(int sleepMultiplier) { 128 if (sleepMultiplier < maxRetriesMultiplier) { 129 sleepMultiplier++; 130 } 131 Threads.sleep(sleepForRetries * sleepMultiplier); 132 return sleepMultiplier; 133 } 134 135 @Override 136 public void run() { 137 int sleepMultiplier = 1; 138 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 139 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, 140 source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { 141 while (isReaderRunning()) { // loop here to keep reusing stream while we can 142 if (!source.isPeerEnabled()) { 143 waitingPeerEnabled.set(true); 144 Threads.sleep(sleepForRetries); 145 continue; 146 } else { 147 waitingPeerEnabled.set(false); 148 } 149 if (!checkBufferQuota()) { 150 continue; 151 } 152 Path currentPath = entryStream.getCurrentPath(); 153 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 154 if (hasNext == WALEntryStream.HasNext.NO) { 155 replicationDone(); 156 return; 157 } 158 // first, check if we have switched a file, if so, we need to manually add an EOF entry 159 // batch to the queue 160 if (currentPath != null && switched(entryStream, currentPath)) { 161 entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath)); 162 continue; 163 } 164 if (hasNext == WALEntryStream.HasNext.RETRY) { 165 // sleep and retry 166 sleepMultiplier = sleep(sleepMultiplier); 167 continue; 168 } 169 if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) { 170 // retry immediately, this usually means we have switched a file 171 continue; 172 } 173 // below are all for hasNext == YES 174 WALEntryBatch batch = createBatch(entryStream); 175 boolean successAddToQueue = false; 176 try { 177 readWALEntries(entryStream, batch); 178 currentPosition = entryStream.getPosition(); 179 // need to propagate the batch even it has no entries since it may carry the last 180 // sequence id information for serial replication. 181 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 182 entryBatchQueue.put(batch); 183 successAddToQueue = true; 184 sleepMultiplier = 1; 185 } finally { 186 if (!successAddToQueue) { 187 // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should 188 // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which 189 // acquired in ReplicationSourceWALReader.acquireBufferQuota. 190 this.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 191 } 192 } 193 } 194 } catch (WALEntryFilterRetryableException e) { 195 // here we have to recreate the WALEntryStream, as when filtering, we have already called 196 // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it 197 // just considers everything is fine,that's why the catch block is not in the inner block 198 LOG.warn("Failed to filter WAL entries and the filter let us retry later", e); 199 sleepMultiplier = sleep(sleepMultiplier); 200 } catch (InterruptedException e) { 201 // this usually means we want to quit 202 LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue", 203 e); 204 Thread.currentThread().interrupt(); 205 } 206 } 207 } 208 209 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 210 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 211 WALEdit edit = entry.getEdit(); 212 if (edit == null || edit.isEmpty()) { 213 LOG.trace("Edit null or empty for entry {} ", entry); 214 return false; 215 } 216 LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 217 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 218 updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); 219 long entrySize = getEntrySizeIncludeBulkLoad(entry); 220 batch.addEntry(entry, entrySize); 221 updateBatchStats(batch, entry, entrySize); 222 boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry); 223 224 // Stop if too many entries or too big 225 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 226 || batch.getNbEntries() >= replicationBatchCountCapacity; 227 } 228 229 protected static final boolean switched(WALEntryStream entryStream, Path path) { 230 Path newPath = entryStream.getCurrentPath(); 231 return newPath == null || !path.getName().equals(newPath.getName()); 232 } 233 234 // We need to get the WALEntryBatch from the caller so we can add entries in there 235 // This is required in case there is any exception in while reading entries 236 // we do not want to loss the existing entries in the batch 237 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 238 throws InterruptedException { 239 Path currentPath = entryStream.getCurrentPath(); 240 for (;;) { 241 Entry entry = entryStream.next(); 242 batch.setLastWalPosition(entryStream.getPosition()); 243 entry = filterEntry(entry); 244 if (entry != null) { 245 if (addEntryToBatch(batch, entry)) { 246 break; 247 } 248 } 249 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 250 // always return if we have switched to a new file 251 if (switched(entryStream, currentPath)) { 252 batch.setEndOfFile(true); 253 break; 254 } 255 if (hasNext != WALEntryStream.HasNext.YES) { 256 // For hasNext other than YES, it is OK to just retry. 257 // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will 258 // return NO again when you call the method next time, so it is OK to just return here and 259 // let the loop in the upper layer to call hasNext again. 260 break; 261 } 262 } 263 } 264 265 public Path getCurrentPath() { 266 // if we've read some WAL entries, get the Path we read from 267 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 268 if (batchQueueHead != null) { 269 return batchQueueHead.getLastWalPath(); 270 } 271 // otherwise, we must be currently reading from the head of the log queue 272 return logQueue.getQueue(walGroupId).peek(); 273 } 274 275 // returns false if we've already exceeded the global quota 276 private boolean checkBufferQuota() { 277 // try not to go over total quota 278 if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { 279 Threads.sleep(sleepForRetries); 280 return false; 281 } 282 return true; 283 } 284 285 private WALEntryBatch createBatch(WALEntryStream entryStream) { 286 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 287 } 288 289 protected final Entry filterEntry(Entry entry) { 290 // Always replicate if this edit is Replication Marker edit. 291 if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { 292 return entry; 293 } 294 Entry filtered = filter.filter(entry); 295 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 296 LOG.trace("Filtered entry for replication: {}", entry); 297 source.getSourceMetrics().incrLogEditsFiltered(); 298 } 299 return filtered; 300 } 301 302 /** 303 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 304 * batch to become available 305 * @return A batch of entries, along with the position in the log after reading the batch 306 * @throws InterruptedException if interrupted while waiting 307 */ 308 public WALEntryBatch take() throws InterruptedException { 309 return entryBatchQueue.take(); 310 } 311 312 public WALEntryBatch poll(long timeout) throws InterruptedException { 313 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 314 } 315 316 private long getEntrySizeIncludeBulkLoad(Entry entry) { 317 WALEdit edit = entry.getEdit(); 318 return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 319 } 320 321 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 322 WALEdit edit = entry.getEdit(); 323 batch.incrementHeapSize(entrySize); 324 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 325 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 326 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 327 } 328 329 /** 330 * Count the number of different row keys in the given edit because of mini-batching. We assume 331 * that there's at least one Cell in the WALEdit. 332 * @param edit edit to count row keys from 333 * @return number of different row keys and HFiles 334 */ 335 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 336 List<Cell> cells = edit.getCells(); 337 int distinctRowKeys = 1; 338 int totalHFileEntries = 0; 339 Cell lastCell = cells.get(0); 340 341 int totalCells = edit.size(); 342 for (int i = 0; i < totalCells; i++) { 343 // Count HFiles to be replicated 344 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 345 try { 346 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 347 List<StoreDescriptor> stores = bld.getStoresList(); 348 int totalStores = stores.size(); 349 for (int j = 0; j < totalStores; j++) { 350 totalHFileEntries += stores.get(j).getStoreFileList().size(); 351 } 352 } catch (IOException e) { 353 LOG.error("Failed to deserialize bulk load entry from wal edit. " 354 + "Then its hfiles count will not be added into metric.", e); 355 } 356 } 357 358 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 359 distinctRowKeys++; 360 } 361 lastCell = cells.get(i); 362 } 363 364 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 365 return result; 366 } 367 368 /** 369 * Calculate the total size of all the store files 370 * @param edit edit to count row keys from 371 * @return the total size of the store files 372 */ 373 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 374 List<Cell> cells = edit.getCells(); 375 int totalStoreFilesSize = 0; 376 377 int totalCells = edit.size(); 378 for (int i = 0; i < totalCells; i++) { 379 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 380 try { 381 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 382 List<StoreDescriptor> stores = bld.getStoresList(); 383 int totalStores = stores.size(); 384 for (int j = 0; j < totalStores; j++) { 385 totalStoreFilesSize = 386 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 387 } 388 } catch (IOException e) { 389 LOG.error("Failed to deserialize bulk load entry from wal edit. " 390 + "Size of HFiles part of cell will not be considered in replication " 391 + "request size calculation.", e); 392 } 393 } 394 } 395 return totalStoreFilesSize; 396 } 397 398 /* 399 * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to 400 * cell's value. 401 */ 402 private void updateReplicationMarkerEdit(Entry entry, long offset) { 403 WALEdit edit = entry.getEdit(); 404 // Return early if it is not ReplicationMarker edit. 405 if (!WALEdit.isReplicationMarkerEdit(edit)) { 406 return; 407 } 408 List<Cell> cells = edit.getCells(); 409 Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); 410 Cell cell = cells.get(0); 411 // Create a descriptor with region_server_name, wal_name and offset 412 WALProtos.ReplicationMarkerDescriptor.Builder builder = 413 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 414 builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); 415 builder.setWalName(getCurrentPath().getName()); 416 builder.setOffset(offset); 417 WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); 418 419 // Create a new KeyValue 420 KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), 421 CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); 422 ArrayList<Cell> newCells = new ArrayList<>(); 423 newCells.add(kv); 424 // Update edit with new cell. 425 edit.setCells(newCells); 426 } 427 428 /** Returns whether the reader thread is running */ 429 public boolean isReaderRunning() { 430 return isReaderRunning && !isInterrupted(); 431 } 432 433 /** 434 * @param readerRunning the readerRunning to set 435 */ 436 public void setReaderRunning(boolean readerRunning) { 437 this.isReaderRunning = readerRunning; 438 } 439 440 private ReplicationSourceManager getSourceManager() { 441 return this.source.getSourceManager(); 442 } 443}