001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyInt; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.anyLong; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.TreeMap; 037import java.util.concurrent.CompletableFuture; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.commons.lang3.mutable.MutableInt; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.TableNameTestRule; 047import org.apache.hadoop.hbase.client.AsyncClusterConnection; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.ipc.ServerCall; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.wal.WALEdit; 058import org.apache.hadoop.hbase.wal.WALKeyImpl; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 069 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestRegionReplicationSink { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestRegionReplicationSink.class); 076 077 private Configuration conf; 078 079 private TableDescriptor td; 080 081 private RegionInfo primary; 082 083 private Runnable flushRequester; 084 085 private AsyncClusterConnection conn; 086 087 private RegionReplicationBufferManager manager; 088 089 private RegionReplicationSink sink; 090 091 @Rule 092 public final TableNameTestRule name = new TableNameTestRule(); 093 094 @Before 095 public void setUp() { 096 conf = HBaseConfiguration.create(); 097 conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5); 098 conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024); 099 td = TableDescriptorBuilder.newBuilder(name.getTableName()) 100 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build(); 101 primary = RegionInfoBuilder.newBuilder(name.getTableName()).build(); 102 flushRequester = mock(Runnable.class); 103 conn = mock(AsyncClusterConnection.class); 104 manager = mock(RegionReplicationBufferManager.class); 105 sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn); 106 } 107 108 @After 109 public void tearDown() throws InterruptedException { 110 sink.stop(); 111 sink.waitUntilStopped(); 112 } 113 114 @Test 115 public void testNormal() { 116 MutableInt next = new MutableInt(0); 117 List<CompletableFuture<Void>> futures = 118 Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); 119 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 120 .then(i -> futures.get(next.getAndIncrement())); 121 ServerCall<?> rpcCall = mock(ServerCall.class); 122 WALKeyImpl key = mock(WALKeyImpl.class); 123 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 124 WALEdit edit = mock(WALEdit.class); 125 when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); 126 when(manager.increase(anyLong())).thenReturn(true); 127 128 sink.add(key, edit, rpcCall); 129 // should call increase on manager 130 verify(manager, times(1)).increase(anyLong()); 131 // should have been retained 132 verify(rpcCall, times(1)).retainByWAL(); 133 assertEquals(1100, sink.pendingSize()); 134 135 futures.get(0).complete(null); 136 // should not call decrease yet 137 verify(manager, never()).decrease(anyLong()); 138 // should not call release yet 139 verify(rpcCall, never()).releaseByWAL(); 140 assertEquals(1100, sink.pendingSize()); 141 142 futures.get(1).complete(null); 143 // should call decrease 144 verify(manager, times(1)).decrease(anyLong()); 145 // should call release 146 verify(rpcCall, times(1)).releaseByWAL(); 147 assertEquals(0, sink.pendingSize()); 148 } 149 150 @Test 151 public void testDropEdits() { 152 MutableInt next = new MutableInt(0); 153 List<CompletableFuture<Void>> futures = 154 Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); 155 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 156 .then(i -> futures.get(next.getAndIncrement())); 157 ServerCall<?> rpcCall1 = mock(ServerCall.class); 158 WALKeyImpl key1 = mock(WALKeyImpl.class); 159 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 160 WALEdit edit1 = mock(WALEdit.class); 161 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 162 when(manager.increase(anyLong())).thenReturn(true); 163 164 sink.add(key1, edit1, rpcCall1); 165 verify(manager, times(1)).increase(anyLong()); 166 verify(manager, never()).decrease(anyLong()); 167 verify(rpcCall1, times(1)).retainByWAL(); 168 assertEquals(1100, sink.pendingSize()); 169 170 ServerCall<?> rpcCall2 = mock(ServerCall.class); 171 WALKeyImpl key2 = mock(WALKeyImpl.class); 172 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 173 WALEdit edit2 = mock(WALEdit.class); 174 when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); 175 176 sink.add(key2, edit2, rpcCall2); 177 verify(manager, times(2)).increase(anyLong()); 178 verify(manager, never()).decrease(anyLong()); 179 verify(rpcCall2, times(1)).retainByWAL(); 180 assertEquals(3300, sink.pendingSize()); 181 182 ServerCall<?> rpcCall3 = mock(ServerCall.class); 183 WALKeyImpl key3 = mock(WALKeyImpl.class); 184 when(key3.estimatedSerializedSizeOf()).thenReturn(200L); 185 WALEdit edit3 = mock(WALEdit.class); 186 when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L); 187 when(manager.increase(anyLong())).thenReturn(false); 188 189 // should not buffer this edit 190 sink.add(key3, edit3, rpcCall3); 191 verify(manager, times(3)).increase(anyLong()); 192 verify(manager, times(1)).decrease(anyLong()); 193 // should retain and then release immediately 194 verify(rpcCall3, times(1)).retainByWAL(); 195 verify(rpcCall3, times(1)).releaseByWAL(); 196 // should also clear the pending edit 197 verify(rpcCall2, times(1)).releaseByWAL(); 198 assertEquals(1100, sink.pendingSize()); 199 // should have request flush 200 verify(flushRequester, times(1)).run(); 201 202 // finish the replication for first edit, we should decrease the size, release the rpc call,and 203 // the pendingSize should be 0 as there are no pending entries 204 futures.forEach(f -> f.complete(null)); 205 verify(manager, times(2)).decrease(anyLong()); 206 verify(rpcCall1, times(1)).releaseByWAL(); 207 assertEquals(0, sink.pendingSize()); 208 209 // should only call replicate 2 times for replicating the first edit, as we have 2 secondary 210 // replicas 211 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 212 } 213 214 @Test 215 public void testNotAddToFailedReplicas() { 216 MutableInt next = new MutableInt(0); 217 List<CompletableFuture<Void>> futures = 218 Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList()); 219 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 220 .then(i -> futures.get(next.getAndIncrement())); 221 222 ServerCall<?> rpcCall1 = mock(ServerCall.class); 223 WALKeyImpl key1 = mock(WALKeyImpl.class); 224 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 225 when(key1.getSequenceId()).thenReturn(1L); 226 WALEdit edit1 = mock(WALEdit.class); 227 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 228 when(manager.increase(anyLong())).thenReturn(true); 229 sink.add(key1, edit1, rpcCall1); 230 231 ServerCall<?> rpcCall2 = mock(ServerCall.class); 232 WALKeyImpl key2 = mock(WALKeyImpl.class); 233 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 234 when(key2.getSequenceId()).thenReturn(3L); 235 236 Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() 237 .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { 238 throw new IllegalStateException(); 239 }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); 240 FlushDescriptor fd = 241 ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); 242 WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd); 243 sink.add(key2, edit2, rpcCall2); 244 245 // fail the call to replica 2 246 futures.get(0).complete(null); 247 futures.get(1).completeExceptionally(new IOException("inject error")); 248 249 // the failure should not cause replica 2 to be added to failedReplicas, as we have already 250 // trigger a flush after it. 251 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 252 253 futures.get(2).complete(null); 254 futures.get(3).complete(null); 255 256 // should have send out all so no pending entries. 257 assertEquals(0, sink.pendingSize()); 258 } 259 260 @Test 261 public void testAddToFailedReplica() { 262 MutableInt next = new MutableInt(0); 263 List<CompletableFuture<Void>> futures = 264 Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList()); 265 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 266 .then(i -> futures.get(next.getAndIncrement())); 267 268 ServerCall<?> rpcCall1 = mock(ServerCall.class); 269 WALKeyImpl key1 = mock(WALKeyImpl.class); 270 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 271 when(key1.getSequenceId()).thenReturn(1L); 272 WALEdit edit1 = mock(WALEdit.class); 273 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 274 when(manager.increase(anyLong())).thenReturn(true); 275 sink.add(key1, edit1, rpcCall1); 276 277 ServerCall<?> rpcCall2 = mock(ServerCall.class); 278 WALKeyImpl key2 = mock(WALKeyImpl.class); 279 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 280 when(key2.getSequenceId()).thenReturn(1L); 281 WALEdit edit2 = mock(WALEdit.class); 282 when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); 283 when(manager.increase(anyLong())).thenReturn(true); 284 sink.add(key2, edit2, rpcCall2); 285 286 // fail the call to replica 2 287 futures.get(0).complete(null); 288 futures.get(1).completeExceptionally(new IOException("inject error")); 289 290 // we should only call replicate once for edit2, since replica 2 is marked as failed 291 verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 292 futures.get(2).complete(null); 293 // should have send out all so no pending entries. 294 assertEquals(0, sink.pendingSize()); 295 296 ServerCall<?> rpcCall3 = mock(ServerCall.class); 297 WALKeyImpl key3 = mock(WALKeyImpl.class); 298 when(key3.estimatedSerializedSizeOf()).thenReturn(200L); 299 when(key3.getSequenceId()).thenReturn(3L); 300 Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() 301 .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { 302 throw new IllegalStateException(); 303 }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); 304 FlushDescriptor fd = 305 ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); 306 WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd); 307 sink.add(key3, edit3, rpcCall3); 308 309 // the flush marker should have cleared the failedReplicas, so we will send the edit to 2 310 // replicas again 311 verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 312 futures.get(3).complete(null); 313 futures.get(4).complete(null); 314 315 // should have send out all so no pending entries. 316 assertEquals(0, sink.pendingSize()); 317 } 318 319 @Test 320 public void testSizeCapacity() { 321 MutableInt next = new MutableInt(0); 322 List<CompletableFuture<Void>> futures = 323 Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList()); 324 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 325 .then(i -> futures.get(next.getAndIncrement())); 326 for (int i = 0; i < 3; i++) { 327 ServerCall<?> rpcCall = mock(ServerCall.class); 328 WALKeyImpl key = mock(WALKeyImpl.class); 329 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 330 when(key.getSequenceId()).thenReturn(i + 1L); 331 WALEdit edit = mock(WALEdit.class); 332 when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024); 333 when(manager.increase(anyLong())).thenReturn(true); 334 sink.add(key, edit, rpcCall); 335 } 336 // the first entry will be send out immediately 337 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 338 339 // complete the first send 340 futures.get(0).complete(null); 341 futures.get(1).complete(null); 342 343 // we should have another batch 344 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 345 346 // complete the second send 347 futures.get(2).complete(null); 348 futures.get(3).complete(null); 349 350 // the size of the second entry is greater than 1024 * 1024, so we will have another batch 351 verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 352 353 // complete the third send 354 futures.get(4).complete(null); 355 futures.get(5).complete(null); 356 357 // should have send out all so no pending entries. 358 assertEquals(0, sink.pendingSize()); 359 } 360 361 @Test 362 public void testCountCapacity() { 363 MutableInt next = new MutableInt(0); 364 List<CompletableFuture<Void>> futures = 365 Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList()); 366 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 367 .then(i -> futures.get(next.getAndIncrement())); 368 for (int i = 0; i < 7; i++) { 369 ServerCall<?> rpcCall = mock(ServerCall.class); 370 WALKeyImpl key = mock(WALKeyImpl.class); 371 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 372 when(key.getSequenceId()).thenReturn(i + 1L); 373 WALEdit edit = mock(WALEdit.class); 374 when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); 375 when(manager.increase(anyLong())).thenReturn(true); 376 sink.add(key, edit, rpcCall); 377 } 378 // the first entry will be send out immediately 379 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 380 381 // complete the first send 382 futures.get(0).complete(null); 383 futures.get(1).complete(null); 384 385 // we should have another batch 386 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 387 388 // complete the second send 389 futures.get(2).complete(null); 390 futures.get(3).complete(null); 391 392 // because of the count limit is 5, the above send can not send all the edits, so we will do 393 // another send 394 verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 395 396 // complete the third send 397 futures.get(4).complete(null); 398 futures.get(5).complete(null); 399 400 // should have send out all so no pending entries. 401 assertEquals(0, sink.pendingSize()); 402 } 403}