001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.ExtendedCell; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.replication.WALEntryFilter; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 049 050/** 051 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 052 * onto a queue 053 */ 054@InterfaceAudience.Private 055@InterfaceStability.Evolving 056class ReplicationSourceWALReader extends Thread { 057 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 058 059 private final ReplicationSourceLogQueue logQueue; 060 private final FileSystem fs; 061 private final Configuration conf; 062 private final WALEntryFilter filter; 063 private final ReplicationSource source; 064 065 @InterfaceAudience.Private 066 final BlockingQueue<WALEntryBatch> entryBatchQueue; 067 // max (heap) size of each batch - multiply by number of batches in queue to get total 068 private final long replicationBatchSizeCapacity; 069 // max count of each batch - multiply by number of batches in queue to get total 070 private final int replicationBatchCountCapacity; 071 // position in the WAL to start reading at 072 private long currentPosition; 073 private final long sleepForRetries; 074 private final int maxRetriesMultiplier; 075 076 // Indicates whether this particular worker is running 077 private boolean isReaderRunning = true; 078 private final String walGroupId; 079 080 /** 081 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 082 * entries, and puts them on a batch queue. 083 * @param fs the files system to use 084 * @param conf configuration to use 085 * @param logQueue The WAL queue to read off of 086 * @param startPosition position in the first WAL to start reading from 087 * @param filter The filter to use while reading 088 * @param source replication source 089 */ 090 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 091 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 092 ReplicationSource source, String walGroupId) { 093 this.logQueue = logQueue; 094 this.currentPosition = startPosition; 095 this.fs = fs; 096 this.conf = conf; 097 this.filter = filter; 098 this.source = source; 099 this.replicationBatchSizeCapacity = 100 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 101 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 102 // memory used will be batchSizeCapacity * (nb.batches + 1) 103 // the +1 is for the current thread reading before placing onto the queue 104 int batchCount = conf.getInt("replication.source.nb.batches", 1); 105 // 1 second 106 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 107 // 5 minutes @ 1 sec per 108 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 109 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 110 this.walGroupId = walGroupId; 111 LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " 112 + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 113 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 114 + ", replicationBatchQueueCapacity=" + batchCount); 115 } 116 117 private void replicationDone() throws InterruptedException { 118 // we're done with current queue, either this is a recovered queue, or it is the special 119 // group for a sync replication peer and the peer has been transited to DA or S state. 120 LOG.debug("Stopping the replication source wal reader"); 121 setReaderRunning(false); 122 // shuts down shipper thread immediately 123 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 124 } 125 126 protected final int sleep(int sleepMultiplier) { 127 if (sleepMultiplier < maxRetriesMultiplier) { 128 sleepMultiplier++; 129 } 130 Threads.sleep(sleepForRetries * sleepMultiplier); 131 return sleepMultiplier; 132 } 133 134 @Override 135 public void run() { 136 int sleepMultiplier = 1; 137 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 138 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, 139 source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { 140 while (isReaderRunning()) { // loop here to keep reusing stream while we can 141 if (!source.isPeerEnabled()) { 142 Threads.sleep(sleepForRetries); 143 continue; 144 } 145 if (!checkBufferQuota()) { 146 continue; 147 } 148 Path currentPath = entryStream.getCurrentPath(); 149 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 150 if (hasNext == WALEntryStream.HasNext.NO) { 151 replicationDone(); 152 return; 153 } 154 // first, check if we have switched a file, if so, we need to manually add an EOF entry 155 // batch to the queue 156 if (currentPath != null && switched(entryStream, currentPath)) { 157 entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath)); 158 continue; 159 } 160 if (hasNext == WALEntryStream.HasNext.RETRY) { 161 // sleep and retry 162 sleepMultiplier = sleep(sleepMultiplier); 163 continue; 164 } 165 if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) { 166 // retry immediately, this usually means we have switched a file 167 continue; 168 } 169 // below are all for hasNext == YES 170 WALEntryBatch batch = createBatch(entryStream); 171 boolean successAddToQueue = false; 172 try { 173 readWALEntries(entryStream, batch); 174 currentPosition = entryStream.getPosition(); 175 // need to propagate the batch even it has no entries since it may carry the last 176 // sequence id information for serial replication. 177 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 178 entryBatchQueue.put(batch); 179 successAddToQueue = true; 180 sleepMultiplier = 1; 181 } finally { 182 if (!successAddToQueue) { 183 // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should 184 // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which 185 // acquired in ReplicationSourceWALReader.acquireBufferQuota. 186 this.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 187 } 188 } 189 } 190 } catch (WALEntryFilterRetryableException e) { 191 // here we have to recreate the WALEntryStream, as when filtering, we have already called 192 // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it 193 // just considers everything is fine,that's why the catch block is not in the inner block 194 LOG.warn("Failed to filter WAL entries and the filter let us retry later", e); 195 sleepMultiplier = sleep(sleepMultiplier); 196 } catch (InterruptedException e) { 197 // this usually means we want to quit 198 LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue", 199 e); 200 Thread.currentThread().interrupt(); 201 } 202 } 203 } 204 205 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 206 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 207 WALEdit edit = entry.getEdit(); 208 if (edit == null || edit.isEmpty()) { 209 LOG.trace("Edit null or empty for entry {} ", entry); 210 return false; 211 } 212 LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 213 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 214 updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); 215 long entrySize = getEntrySizeIncludeBulkLoad(entry); 216 batch.addEntry(entry, entrySize); 217 updateBatchStats(batch, entry, entrySize); 218 boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry); 219 220 // Stop if too many entries or too big 221 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 222 || batch.getNbEntries() >= replicationBatchCountCapacity; 223 } 224 225 protected static final boolean switched(WALEntryStream entryStream, Path path) { 226 Path newPath = entryStream.getCurrentPath(); 227 return newPath == null || !path.getName().equals(newPath.getName()); 228 } 229 230 // We need to get the WALEntryBatch from the caller so we can add entries in there 231 // This is required in case there is any exception in while reading entries 232 // we do not want to loss the existing entries in the batch 233 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 234 throws InterruptedException { 235 Path currentPath = entryStream.getCurrentPath(); 236 for (;;) { 237 Entry entry = entryStream.next(); 238 batch.setLastWalPosition(entryStream.getPosition()); 239 entry = filterEntry(entry); 240 if (entry != null) { 241 if (addEntryToBatch(batch, entry)) { 242 break; 243 } 244 } 245 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 246 // always return if we have switched to a new file 247 if (switched(entryStream, currentPath)) { 248 batch.setEndOfFile(true); 249 break; 250 } 251 if (hasNext != WALEntryStream.HasNext.YES) { 252 // For hasNext other than YES, it is OK to just retry. 253 // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will 254 // return NO again when you call the method next time, so it is OK to just return here and 255 // let the loop in the upper layer to call hasNext again. 256 break; 257 } 258 } 259 } 260 261 public Path getCurrentPath() { 262 // if we've read some WAL entries, get the Path we read from 263 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 264 if (batchQueueHead != null) { 265 return batchQueueHead.getLastWalPath(); 266 } 267 // otherwise, we must be currently reading from the head of the log queue 268 return logQueue.getQueue(walGroupId).peek(); 269 } 270 271 // returns false if we've already exceeded the global quota 272 private boolean checkBufferQuota() { 273 // try not to go over total quota 274 if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { 275 Threads.sleep(sleepForRetries); 276 return false; 277 } 278 return true; 279 } 280 281 private WALEntryBatch createBatch(WALEntryStream entryStream) { 282 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 283 } 284 285 protected final Entry filterEntry(Entry entry) { 286 // Always replicate if this edit is Replication Marker edit. 287 if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { 288 return entry; 289 } 290 Entry filtered = filter.filter(entry); 291 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 292 LOG.trace("Filtered entry for replication: {}", entry); 293 source.getSourceMetrics().incrLogEditsFiltered(); 294 } 295 return filtered; 296 } 297 298 /** 299 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 300 * batch to become available 301 * @return A batch of entries, along with the position in the log after reading the batch 302 * @throws InterruptedException if interrupted while waiting 303 */ 304 public WALEntryBatch take() throws InterruptedException { 305 return entryBatchQueue.take(); 306 } 307 308 public WALEntryBatch poll(long timeout) throws InterruptedException { 309 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 310 } 311 312 private long getEntrySizeIncludeBulkLoad(Entry entry) { 313 WALEdit edit = entry.getEdit(); 314 return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 315 } 316 317 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 318 WALEdit edit = entry.getEdit(); 319 batch.incrementHeapSize(entrySize); 320 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 321 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 322 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 323 } 324 325 /** 326 * Count the number of different row keys in the given edit because of mini-batching. We assume 327 * that there's at least one Cell in the WALEdit. 328 * @param edit edit to count row keys from 329 * @return number of different row keys and HFiles 330 */ 331 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 332 List<Cell> cells = edit.getCells(); 333 int distinctRowKeys = 1; 334 int totalHFileEntries = 0; 335 Cell lastCell = cells.get(0); 336 337 int totalCells = edit.size(); 338 for (int i = 0; i < totalCells; i++) { 339 // Count HFiles to be replicated 340 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 341 try { 342 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 343 List<StoreDescriptor> stores = bld.getStoresList(); 344 int totalStores = stores.size(); 345 for (int j = 0; j < totalStores; j++) { 346 totalHFileEntries += stores.get(j).getStoreFileList().size(); 347 } 348 } catch (IOException e) { 349 LOG.error("Failed to deserialize bulk load entry from wal edit. " 350 + "Then its hfiles count will not be added into metric.", e); 351 } 352 } 353 354 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 355 distinctRowKeys++; 356 } 357 lastCell = cells.get(i); 358 } 359 360 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 361 return result; 362 } 363 364 /** 365 * Calculate the total size of all the store files 366 * @param edit edit to count row keys from 367 * @return the total size of the store files 368 */ 369 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 370 List<Cell> cells = edit.getCells(); 371 int totalStoreFilesSize = 0; 372 373 int totalCells = edit.size(); 374 for (int i = 0; i < totalCells; i++) { 375 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 376 try { 377 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 378 List<StoreDescriptor> stores = bld.getStoresList(); 379 int totalStores = stores.size(); 380 for (int j = 0; j < totalStores; j++) { 381 totalStoreFilesSize = 382 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 383 } 384 } catch (IOException e) { 385 LOG.error("Failed to deserialize bulk load entry from wal edit. " 386 + "Size of HFiles part of cell will not be considered in replication " 387 + "request size calculation.", e); 388 } 389 } 390 } 391 return totalStoreFilesSize; 392 } 393 394 /* 395 * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to 396 * cell's value. 397 */ 398 private void updateReplicationMarkerEdit(Entry entry, long offset) { 399 WALEdit edit = entry.getEdit(); 400 // Return early if it is not ReplicationMarker edit. 401 if (!WALEdit.isReplicationMarkerEdit(edit)) { 402 return; 403 } 404 List<Cell> cells = edit.getCells(); 405 Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); 406 Cell cell = cells.get(0); 407 // Create a descriptor with region_server_name, wal_name and offset 408 WALProtos.ReplicationMarkerDescriptor.Builder builder = 409 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 410 builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); 411 builder.setWalName(getCurrentPath().getName()); 412 builder.setOffset(offset); 413 WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); 414 415 // Create a new KeyValue 416 KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), 417 CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); 418 ArrayList<ExtendedCell> newCells = new ArrayList<>(); 419 newCells.add(kv); 420 // Update edit with new cell. 421 WALEditInternalHelper.setExtendedCells(edit, newCells); 422 } 423 424 /** Returns whether the reader thread is running */ 425 public boolean isReaderRunning() { 426 return isReaderRunning && !isInterrupted(); 427 } 428 429 /** 430 * @param readerRunning the readerRunning to set 431 */ 432 public void setReaderRunning(boolean readerRunning) { 433 this.isReaderRunning = readerRunning; 434 } 435 436 private ReplicationSourceManager getSourceManager() { 437 return this.source.getSourceManager(); 438 } 439}