001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021 022import java.io.IOException; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.hadoop.hbase.wal.WAL.Entry; 033import org.apache.hadoop.hbase.wal.WALEdit; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 040 041/** 042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 043 * ReplicationSourceWALReaderThread 044 */ 045@InterfaceAudience.Private 046public class ReplicationSourceShipper extends Thread { 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 048 049 // Hold the state of a replication worker thread 050 public enum WorkerState { 051 RUNNING, 052 STOPPED, 053 FINISHED, // The worker is done processing a recovered queue 054 } 055 056 private final Configuration conf; 057 protected final String walGroupId; 058 protected final ReplicationSourceLogQueue logQueue; 059 private final ReplicationSource source; 060 061 // Last position in the log that we sent to ZooKeeper 062 // It will be accessed by the stats thread so make it volatile 063 private volatile long currentPosition = -1; 064 // Path of the current log 065 private Path currentPath; 066 // Current state of the worker thread 067 private volatile WorkerState state; 068 protected ReplicationSourceWALReader entryReader; 069 070 // How long should we sleep for each retry 071 protected final long sleepForRetries; 072 // Maximum number of retries before taking bold actions 073 protected final int maxRetriesMultiplier; 074 private final int DEFAULT_TIMEOUT = 20000; 075 private final int getEntriesTimeout; 076 private final int shipEditsTimeout; 077 078 public ReplicationSourceShipper(Configuration conf, String walGroupId, 079 ReplicationSourceLogQueue logQueue, ReplicationSource source) { 080 this.conf = conf; 081 this.walGroupId = walGroupId; 082 this.logQueue = logQueue; 083 this.source = source; 084 // 1 second 085 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 086 // 5 minutes @ 1 sec per 087 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 088 // 20 seconds 089 this.getEntriesTimeout = 090 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); 091 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 092 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 093 } 094 095 @Override 096 public final void run() { 097 setWorkerState(WorkerState.RUNNING); 098 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 099 // Loop until we close down 100 while (isActive()) { 101 // Sleep until replication is enabled again 102 if (!source.isPeerEnabled()) { 103 // The peer enabled check is in memory, not expensive, so do not need to increase the 104 // sleep interval as it may cause a long lag when we enable the peer. 105 sleepForRetries("Replication is disabled", 1); 106 continue; 107 } 108 try { 109 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 110 LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), 111 entryBatch); 112 if (entryBatch == null) { 113 continue; 114 } 115 // the NO_MORE_DATA instance has no path so do not call shipEdits 116 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 117 noMoreData(); 118 } else { 119 shipEdits(entryBatch); 120 } 121 } catch (InterruptedException | ReplicationRuntimeException e) { 122 // It is interrupted and needs to quit. 123 LOG.warn("Interrupted while waiting for next replication entry batch", e); 124 Thread.currentThread().interrupt(); 125 } 126 } 127 // If the worker exits run loop without finishing its task, mark it as stopped. 128 if (!isFinished()) { 129 setWorkerState(WorkerState.STOPPED); 130 } else { 131 source.workerThreads.remove(this.walGroupId); 132 postFinish(); 133 } 134 } 135 136 // To be implemented by recovered shipper 137 protected void noMoreData() { 138 } 139 140 // To be implemented by recovered shipper 141 protected void postFinish() { 142 } 143 144 /** 145 * Do the shipping logic 146 */ 147 private void shipEdits(WALEntryBatch entryBatch) { 148 List<Entry> entries = entryBatch.getWalEntries(); 149 int sleepMultiplier = 0; 150 if (entries.isEmpty()) { 151 updateLogPosition(entryBatch); 152 return; 153 } 154 int currentSize = (int) entryBatch.getHeapSize(); 155 source.getSourceMetrics() 156 .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); 157 while (isActive()) { 158 try { 159 try { 160 source.tryThrottle(currentSize); 161 } catch (InterruptedException e) { 162 LOG.debug("Interrupted while sleeping for throttling control"); 163 Thread.currentThread().interrupt(); 164 // current thread might be interrupted to terminate 165 // directly go back to while() for confirm this 166 continue; 167 } 168 // create replicateContext here, so the entries can be GC'd upon return from this call 169 // stack 170 ReplicationEndpoint.ReplicateContext replicateContext = 171 new ReplicationEndpoint.ReplicateContext(); 172 replicateContext.setEntries(entries).setSize(currentSize); 173 replicateContext.setWalGroupId(walGroupId); 174 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 175 176 long startTimeNs = System.nanoTime(); 177 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 178 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 179 long endTimeNs = System.nanoTime(); 180 181 if (!replicated) { 182 continue; 183 } else { 184 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 185 } 186 // Clean up hfile references 187 for (Entry entry : entries) { 188 cleanUpHFileRefs(entry.getEdit()); 189 LOG.trace("shipped entry {}: ", entry); 190 } 191 // Log and clean up WAL logs 192 updateLogPosition(entryBatch); 193 194 // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 195 // this sizeExcludeBulkLoad has to use same calculation that when calling 196 // acquireBufferQuota() in ReplicationSourceWALReader because they maintain 197 // same variable: totalBufferUsed 198 source.postShipEdits(entries, entryBatch.getUsedBufferSize()); 199 // FIXME check relationship between wal group and overall 200 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 201 entryBatch.getNbHFiles()); 202 source.getSourceMetrics().setAgeOfLastShippedOp( 203 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 204 source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); 205 206 if (LOG.isTraceEnabled()) { 207 LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), 208 entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 209 } 210 break; 211 } catch (Exception ex) { 212 source.getSourceMetrics().incrementFailedBatches(); 213 LOG.warn("{} threw unknown exception:", 214 source.getReplicationEndpoint().getClass().getName(), ex); 215 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 216 sleepMultiplier++; 217 } 218 } 219 } 220 } 221 222 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 223 String peerId = source.getPeerId(); 224 if (peerId.contains("-")) { 225 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 226 // A peerId will not have "-" in its name, see HBASE-11394 227 peerId = peerId.split("-")[0]; 228 } 229 List<Cell> cells = edit.getCells(); 230 int totalCells = cells.size(); 231 for (int i = 0; i < totalCells; i++) { 232 Cell cell = cells.get(i); 233 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 234 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 235 List<StoreDescriptor> stores = bld.getStoresList(); 236 int totalStores = stores.size(); 237 for (int j = 0; j < totalStores; j++) { 238 List<String> storeFileList = stores.get(j).getStoreFileList(); 239 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 240 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 241 } 242 } 243 } 244 } 245 246 private boolean updateLogPosition(WALEntryBatch batch) { 247 boolean updated = false; 248 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 249 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 250 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 251 // position and the file will be removed soon in cleanOldLogs. 252 if ( 253 batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) 254 || batch.getLastWalPosition() != currentPosition 255 ) { 256 source.logPositionAndCleanOldLogs(batch); 257 updated = true; 258 } 259 // if end of file is true, then we can just skip to the next file in queue. 260 // the only exception is for recovered queue, if we reach the end of the queue, then there will 261 // no more files so here the currentPath may be null. 262 if (batch.isEndOfFile()) { 263 currentPath = entryReader.getCurrentPath(); 264 currentPosition = 0L; 265 } else { 266 currentPath = batch.getLastWalPath(); 267 currentPosition = batch.getLastWalPosition(); 268 } 269 return updated; 270 } 271 272 public void startup(UncaughtExceptionHandler handler) { 273 String name = Thread.currentThread().getName(); 274 Threads.setDaemonThreadRunning(this, 275 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), 276 handler::uncaughtException); 277 } 278 279 Path getCurrentPath() { 280 return entryReader.getCurrentPath(); 281 } 282 283 long getCurrentPosition() { 284 return currentPosition; 285 } 286 287 void setWALReader(ReplicationSourceWALReader entryReader) { 288 this.entryReader = entryReader; 289 } 290 291 long getStartPosition() { 292 return 0; 293 } 294 295 protected boolean isActive() { 296 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 297 } 298 299 protected final void setWorkerState(WorkerState state) { 300 this.state = state; 301 } 302 303 void stopWorker() { 304 setWorkerState(WorkerState.STOPPED); 305 } 306 307 public boolean isFinished() { 308 return state == WorkerState.FINISHED; 309 } 310 311 /** 312 * Do the sleeping logic 313 * @param msg Why we sleep 314 * @param sleepMultiplier by how many times the default sleeping time is augmented 315 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 316 */ 317 public boolean sleepForRetries(String msg, int sleepMultiplier) { 318 try { 319 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 320 Thread.sleep(this.sleepForRetries * sleepMultiplier); 321 } catch (InterruptedException e) { 322 LOG.debug("Interrupted while sleeping between retries"); 323 Thread.currentThread().interrupt(); 324 } 325 return sleepMultiplier < maxRetriesMultiplier; 326 } 327 328 /** 329 * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case 330 * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't 331 * manage to ship those because the replication source is being terminated. In that case, it 332 * iterates through the batched entries and decrease the pending entries size from 333 * <code>ReplicationSourceManager.totalBufferUser</code> 334 * <p/> 335 * <b>NOTES</b> 1) This method should only be called upon replication source termination. It 336 * blocks waiting for both shipper and reader threads termination, to make sure no race conditions 337 * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b> 338 * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered 339 * interruption/termination prior to calling this method. 340 */ 341 void clearWALEntryBatch() { 342 long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout; 343 while (this.isAlive() || this.entryReader.isAlive()) { 344 try { 345 if (EnvironmentEdgeManager.currentTime() >= timeout) { 346 LOG.warn( 347 "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper " 348 + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", 349 this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); 350 return; 351 } else { 352 // Wait both shipper and reader threads to stop 353 Thread.sleep(this.sleepForRetries); 354 } 355 } catch (InterruptedException e) { 356 LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " 357 + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); 358 return; 359 } 360 } 361 long totalReleasedBytes = 0; 362 while (true) { 363 WALEntryBatch batch = entryReader.entryBatchQueue.poll(); 364 if (batch == null) { 365 break; 366 } 367 totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 368 } 369 if (LOG.isTraceEnabled()) { 370 LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", 371 totalReleasedBytes); 372 } 373 } 374}