001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Optional; 028import java.util.Queue; 029import java.util.Set; 030import java.util.TreeSet; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.stream.Collectors; 034import org.agrona.collections.IntHashSet; 035import org.apache.commons.lang3.mutable.MutableObject; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.client.AsyncClusterConnection; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionReplicaUtil; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.ipc.ServerCall; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.FutureUtils; 046import org.apache.hadoop.hbase.wal.WAL; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALKeyImpl; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055 056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 058 059/** 060 * The class for replicating WAL edits to secondary replicas, one instance per region. 061 */ 062@InterfaceAudience.Private 063public class RegionReplicationSink { 064 065 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class); 066 067 public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number"; 068 069 public static final int RETRIES_NUMBER_DEFAULT = 3; 070 071 public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms"; 072 073 public static final long RPC_TIMEOUT_MS_DEFAULT = 1000; 074 075 public static final String OPERATION_TIMEOUT_MS = 076 "hbase.region.read-replica.sink.operation.timeout.ms"; 077 078 public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000; 079 080 // the two options below are for replicating meta edits, as usually a meta edit will trigger a 081 // refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta 082 // edit is more important for fixing inconsistent state so it worth to wait for more time. 083 public static final String META_EDIT_RPC_TIMEOUT_MS = 084 "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms"; 085 086 public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000; 087 088 public static final String META_EDIT_OPERATION_TIMEOUT_MS = 089 "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms"; 090 091 public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000; 092 093 public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity"; 094 095 public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024; 096 097 public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity"; 098 099 public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100; 100 101 private static final class SinkEntry { 102 103 final WALKeyImpl key; 104 105 final WALEdit edit; 106 107 final ServerCall<?> rpcCall; 108 109 final long size; 110 111 SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) { 112 this.key = key; 113 this.edit = edit; 114 this.rpcCall = rpcCall; 115 this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf(); 116 if (rpcCall != null) { 117 // increase the reference count to avoid the rpc framework free the memory before we 118 // actually sending them out. 119 rpcCall.retainByWAL(); 120 } 121 } 122 123 /** 124 * Should be called regardless of the result of the replicating operation. Unless you still want 125 * to reuse this entry, otherwise you must call this method to release the possible off heap 126 * memories. 127 */ 128 void replicated() { 129 if (rpcCall != null) { 130 rpcCall.releaseByWAL(); 131 } 132 } 133 } 134 135 private final RegionInfo primary; 136 137 private final TableDescriptor tableDesc; 138 139 // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication. 140 private final int regionReplication; 141 142 private final RegionReplicationBufferManager manager; 143 144 private final RegionReplicationFlushRequester flushRequester; 145 146 private final AsyncClusterConnection conn; 147 148 // used to track the replicas which we failed to replicate edits to them 149 // the key is the replica id, the value is the sequence id of the last failed edit 150 // when we get a flush all request, we will try to remove a replica from this map, the key point 151 // here is the flush sequence number must be greater than the failed sequence id, otherwise we 152 // should not remove the replica from this map 153 private final IntHashSet failedReplicas; 154 155 private final Queue<SinkEntry> entries = new ArrayDeque<>(); 156 157 private final int retries; 158 159 private final long rpcTimeoutNs; 160 161 private final long operationTimeoutNs; 162 163 private final long metaEditRpcTimeoutNs; 164 165 private final long metaEditOperationTimeoutNs; 166 167 private final long batchSizeCapacity; 168 169 private final long batchCountCapacity; 170 171 private volatile long pendingSize; 172 173 private long lastFlushedSequenceId; 174 175 private boolean sending; 176 177 private boolean stopping; 178 179 private boolean stopped; 180 181 public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, 182 RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) { 183 Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary", 184 primary); 185 this.regionReplication = td.getRegionReplication(); 186 Preconditions.checkArgument(this.regionReplication > 1, 187 "region replication should be greater than 1 but got %s", this.regionReplication); 188 this.primary = primary; 189 this.tableDesc = td; 190 this.manager = manager; 191 this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester); 192 this.conn = conn; 193 this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT); 194 this.rpcTimeoutNs = 195 TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT)); 196 this.operationTimeoutNs = TimeUnit.MILLISECONDS 197 .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); 198 this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS 199 .toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT)); 200 this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( 201 conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT)); 202 this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT); 203 this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT); 204 this.failedReplicas = new IntHashSet(regionReplication - 1); 205 } 206 207 void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) { 208 long maxSequenceId = Long.MIN_VALUE; 209 long toReleaseSize = 0; 210 for (SinkEntry entry : sent) { 211 maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId()); 212 entry.replicated(); 213 toReleaseSize += entry.size; 214 } 215 manager.decrease(toReleaseSize); 216 synchronized (entries) { 217 pendingSize -= toReleaseSize; 218 boolean addFailedReplicas = false; 219 for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) { 220 Integer replicaId = entry.getKey(); 221 Throwable error = entry.getValue().getValue(); 222 if (error != null) { 223 if (maxSequenceId > lastFlushedSequenceId) { 224 LOG.warn( 225 "Failed to replicate to secondary replica {} for {}, since the max sequence" 226 + " id of sunk entris is {}, which is greater than the last flush SN {}," 227 + " we will stop replicating for a while and trigger a flush", 228 replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); 229 failedReplicas.add(replicaId); 230 addFailedReplicas = true; 231 } else { 232 LOG.warn( 233 "Failed to replicate to secondary replica {} for {}, since the max sequence" 234 + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," 235 + " we will not stop replicating", 236 replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); 237 } 238 } 239 } 240 241 if (addFailedReplicas) { 242 flushRequester.requestFlush(maxSequenceId); 243 } 244 sending = false; 245 if (stopping) { 246 stopped = true; 247 entries.notifyAll(); 248 return; 249 } 250 if (!entries.isEmpty()) { 251 send(); 252 } 253 } 254 } 255 256 private void send() { 257 // We should check if there are normal replicas first 258 int toSendReplicaCount = regionReplication - 1 - failedReplicas.size(); 259 if (toSendReplicaCount <= 0) { 260 LOG.warn("All replicas {} are failed, exit send....", failedReplicas); 261 return; 262 } 263 264 List<SinkEntry> toSend = new ArrayList<>(); 265 long totalSize = 0L; 266 boolean hasMetaEdit = false; 267 for (SinkEntry entry;;) { 268 entry = entries.poll(); 269 if (entry == null) { 270 break; 271 } 272 toSend.add(entry); 273 totalSize += entry.size; 274 hasMetaEdit |= entry.edit.isMetaEdit(); 275 if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) { 276 break; 277 } 278 } 279 280 long rpcTimeoutNsToUse; 281 long operationTimeoutNsToUse; 282 if (!hasMetaEdit) { 283 rpcTimeoutNsToUse = rpcTimeoutNs; 284 operationTimeoutNsToUse = operationTimeoutNs; 285 } else { 286 rpcTimeoutNsToUse = metaEditRpcTimeoutNs; 287 operationTimeoutNsToUse = metaEditOperationTimeoutNs; 288 } 289 sending = true; 290 List<WAL.Entry> walEntries = 291 toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList()); 292 AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); 293 Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>(); 294 for (int replicaId = 1; replicaId < regionReplication; replicaId++) { 295 if (failedReplicas.contains(replicaId)) { 296 continue; 297 } 298 MutableObject<Throwable> error = new MutableObject<>(); 299 replica2Error.put(replicaId, error); 300 RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); 301 FutureUtils.addListener( 302 conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse), 303 (r, e) -> { 304 error.setValue(e); 305 if (remaining.decrementAndGet() == 0) { 306 onComplete(toSend, replica2Error); 307 } 308 }); 309 } 310 } 311 312 private boolean isStartFlushAllStores(FlushDescriptor flushDesc) { 313 if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) { 314 // this means the memstore is empty, which means all data before this sequence id are flushed 315 // out, so it equals to a flush all, return true 316 return true; 317 } 318 if (flushDesc.getAction() != FlushAction.START_FLUSH) { 319 return false; 320 } 321 Set<byte[]> storesFlushed = 322 flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()) 323 .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); 324 if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) { 325 return false; 326 } 327 return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); 328 } 329 330 Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) { 331 if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { 332 return Optional.empty(); 333 } 334 FlushDescriptor flushDesc; 335 try { 336 flushDesc = WALEdit.getFlushDescriptor(metaCell); 337 } catch (IOException e) { 338 LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); 339 return Optional.empty(); 340 } 341 if (flushDesc != null && isStartFlushAllStores(flushDesc)) { 342 return Optional.of(flushDesc); 343 } else { 344 return Optional.empty(); 345 } 346 } 347 348 private long clearAllEntries() { 349 long toClearSize = 0; 350 for (SinkEntry entry : entries) { 351 toClearSize += entry.size; 352 entry.replicated(); 353 } 354 entries.clear(); 355 pendingSize -= toClearSize; 356 manager.decrease(toClearSize); 357 return toClearSize; 358 } 359 360 /** 361 * Add this edit to replication queue. 362 * <p/> 363 * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the 364 * rpc call has cell scanner, which is off heap. 365 */ 366 public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) { 367 if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) { 368 // only replicate meta edit if region memstore replication is not enabled 369 return; 370 } 371 synchronized (entries) { 372 if (stopping) { 373 return; 374 } 375 if (edit.isMetaEdit()) { 376 // check whether we flushed all stores, which means we could drop all the previous edits, 377 // and also, recover from the previous failure of some replicas 378 for (Cell metaCell : edit.getCells()) { 379 getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> { 380 long flushSequenceNumber = flushDesc.getFlushSequenceNumber(); 381 lastFlushedSequenceId = flushSequenceNumber; 382 long clearedCount = entries.size(); 383 long clearedSize = clearAllEntries(); 384 if (LOG.isDebugEnabled()) { 385 LOG.debug( 386 "Got a flush all request with sequence id {}, clear {} pending" 387 + " entries with size {}, clear failed replicas {}", 388 flushSequenceNumber, clearedCount, 389 StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1), 390 failedReplicas); 391 } 392 failedReplicas.clear(); 393 flushRequester.recordFlush(flushSequenceNumber); 394 }); 395 } 396 } 397 if (failedReplicas.size() == regionReplication - 1) { 398 // this means we have marked all the replicas as failed, so just give up here 399 return; 400 } 401 SinkEntry entry = new SinkEntry(key, edit, rpcCall); 402 entries.add(entry); 403 pendingSize += entry.size; 404 if (manager.increase(entry.size)) { 405 if (!sending) { 406 send(); 407 } 408 } else { 409 // we have run out of the max pending size, drop all the edits, and mark all replicas as 410 // failed 411 clearAllEntries(); 412 for (int replicaId = 1; replicaId < regionReplication; replicaId++) { 413 failedReplicas.add(replicaId); 414 } 415 flushRequester.requestFlush(entry.key.getSequenceId()); 416 } 417 } 418 } 419 420 long pendingSize() { 421 return pendingSize; 422 } 423 424 /** 425 * Stop the replication sink. 426 * <p/> 427 * Usually this should only be called when you want to close a region. 428 */ 429 public void stop() { 430 synchronized (entries) { 431 stopping = true; 432 clearAllEntries(); 433 if (!sending) { 434 stopped = true; 435 entries.notifyAll(); 436 } 437 } 438 } 439 440 /** 441 * Make sure that we have finished all the replicating requests. 442 * <p/> 443 * After returning, we can make sure there will be no new replicating requests to secondary 444 * replicas. 445 * <p/> 446 * This is used to keep the replicating order the same with the WAL edit order when writing. 447 */ 448 public void waitUntilStopped() throws InterruptedException { 449 synchronized (entries) { 450 while (!stopped) { 451 entries.wait(); 452 } 453 } 454 } 455 456 @RestrictedApi(explanation = "Should only be called in tests", link = "", 457 allowedOnPath = ".*/src/test/.*") 458 IntHashSet getFailedReplicas() { 459 synchronized (entries) { 460 return this.failedReplicas; 461 } 462 } 463}