001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyInt; 024import static org.mockito.ArgumentMatchers.anyList; 025import static org.mockito.ArgumentMatchers.anyLong; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayDeque; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.List; 037import java.util.Queue; 038import java.util.UUID; 039import java.util.concurrent.CompletableFuture; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNameTestRule; 050import org.apache.hadoop.hbase.client.AsyncClusterConnection; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.RegionReplicaUtil; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.executor.ExecutorService; 060import org.apache.hadoop.hbase.executor.ExecutorType; 061import org.apache.hadoop.hbase.monitoring.MonitoredTask; 062import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 063import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 064import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 065import org.apache.hadoop.hbase.testclassification.MediumTests; 066import org.apache.hadoop.hbase.testclassification.RegionServerTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.Pair; 069import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 070import org.apache.hadoop.hbase.wal.WAL; 071import org.apache.hadoop.hbase.wal.WALFactory; 072import org.junit.After; 073import org.junit.AfterClass; 074import org.junit.Before; 075import org.junit.BeforeClass; 076import org.junit.ClassRule; 077import org.junit.Rule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 083 084@Category({ RegionServerTests.class, MediumTests.class }) 085public class TestReplicateToReplica { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestReplicateToReplica.class); 090 091 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 092 093 private static byte[] FAMILY = Bytes.toBytes("family"); 094 095 private static byte[] QUAL = Bytes.toBytes("qualifier"); 096 097 private static ExecutorService EXEC; 098 099 @Rule 100 public final TableNameTestRule name = new TableNameTestRule(); 101 102 private TableName tableName; 103 104 private Path testDir; 105 106 private TableDescriptor td; 107 108 private RegionServerServices rss; 109 110 private AsyncClusterConnection conn; 111 112 private RegionReplicationBufferManager manager; 113 114 private FlushRequester flushRequester; 115 116 private HRegion primary; 117 118 private HRegion secondary; 119 120 private WALFactory walFactory; 121 122 private boolean queueReqAndResps; 123 124 private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps; 125 126 private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH; 127 128 public static final class HRegionForTest extends HRegion { 129 130 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 131 TableDescriptor htd, RegionServerServices rsServices) { 132 super(fs, wal, confParam, htd, rsServices); 133 } 134 135 @SuppressWarnings("deprecation") 136 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 137 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 138 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 139 } 140 141 @Override 142 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 143 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 144 FlushLifeCycleTracker tracker) throws IOException { 145 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 146 status, writeFlushWalMarker, tracker); 147 for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) { 148 put(put); 149 } 150 TO_ADD_AFTER_PREPARE_FLUSH.clear(); 151 return result; 152 } 153 154 } 155 156 @BeforeClass 157 public static void setUpBeforeClass() { 158 Configuration conf = UTIL.getConfiguration(); 159 conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1); 160 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 161 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); 162 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 163 EXEC = new ExecutorService("test"); 164 EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1) 165 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 166 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 167 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 168 } 169 170 @AfterClass 171 public static void tearDownAfterClass() { 172 EXEC.shutdown(); 173 UTIL.cleanupTestDir(); 174 } 175 176 @Before 177 public void setUp() throws IOException { 178 TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>(); 179 tableName = name.getTableName(); 180 testDir = UTIL.getDataTestDir(tableName.getNameAsString()); 181 Configuration conf = UTIL.getConfiguration(); 182 conf.set(HConstants.HBASE_DIR, testDir.toString()); 183 184 td = TableDescriptorBuilder.newBuilder(tableName) 185 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2) 186 .setRegionMemStoreReplication(true).build(); 187 188 reqAndResps = new ArrayDeque<>(); 189 queueReqAndResps = true; 190 conn = mock(AsyncClusterConnection.class); 191 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> { 192 if (queueReqAndResps) { 193 @SuppressWarnings("unchecked") 194 List<WAL.Entry> entries = i.getArgument(1, List.class); 195 CompletableFuture<Void> future = new CompletableFuture<>(); 196 reqAndResps.add(Pair.newPair(entries, future)); 197 return future; 198 } else { 199 return CompletableFuture.completedFuture(null); 200 } 201 }); 202 203 flushRequester = mock(FlushRequester.class); 204 205 rss = mock(RegionServerServices.class); 206 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 207 when(rss.getConfiguration()).thenReturn(conf); 208 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf)); 209 when(rss.getExecutorService()).thenReturn(EXEC); 210 when(rss.getAsyncClusterConnection()).thenReturn(conn); 211 when(rss.getFlushRequester()).thenReturn(flushRequester); 212 213 manager = new RegionReplicationBufferManager(rss); 214 when(rss.getRegionReplicationBufferManager()).thenReturn(manager); 215 216 RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 217 RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1); 218 219 walFactory = new WALFactory(conf, UUID.randomUUID().toString()); 220 WAL wal = walFactory.getWAL(primaryHri); 221 primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal); 222 primary.close(); 223 224 primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null); 225 secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null); 226 227 when(rss.getRegions()).then(i -> { 228 return Arrays.asList(primary, secondary); 229 }); 230 231 // process the open events 232 replicateAll(); 233 } 234 235 @After 236 public void tearDown() throws IOException { 237 // close region will issue a flush, which will enqueue an edit into the replication sink so we 238 // need to complete it otherwise the test will hang. 239 queueReqAndResps = false; 240 failAll(); 241 HBaseTestingUtil.closeRegionAndWAL(primary); 242 HBaseTestingUtil.closeRegionAndWAL(secondary); 243 if (walFactory != null) { 244 walFactory.close(); 245 } 246 } 247 248 private FlushResult flushPrimary() throws IOException { 249 return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 250 } 251 252 private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException { 253 Pair<ReplicateWALEntryRequest, 254 ExtendedCellScanner> params = ReplicationProtobufUtil.buildReplicateWALEntryRequest( 255 pair.getFirst().toArray(new WAL.Entry[0]), 256 secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null); 257 for (WALEntry entry : params.getFirst().getEntryList()) { 258 secondary.replayWALEntry(entry, params.getSecond()); 259 } 260 pair.getSecond().complete(null); 261 } 262 263 private void replicateOne() throws IOException { 264 replicate(reqAndResps.remove()); 265 } 266 267 private void replicateAll() throws IOException { 268 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) { 269 pair = reqAndResps.poll(); 270 if (pair == null) { 271 break; 272 } 273 replicate(pair); 274 } 275 } 276 277 private void failOne() { 278 reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error")); 279 } 280 281 private void failAll() { 282 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) { 283 pair = reqAndResps.poll(); 284 if (pair == null) { 285 break; 286 } 287 pair.getSecond().completeExceptionally(new IOException("Inject error")); 288 } 289 } 290 291 @Test 292 public void testNormalReplicate() throws IOException { 293 byte[] row = Bytes.toBytes(0); 294 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 295 replicateOne(); 296 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 297 } 298 299 @Test 300 public void testNormalFlush() throws IOException { 301 byte[] row = Bytes.toBytes(0); 302 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 303 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 304 flushPrimary(); 305 replicateAll(); 306 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 307 308 // we should have the same memstore size, i.e, the secondary should have also dropped the 309 // snapshot 310 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); 311 } 312 313 @Test 314 public void testErrorBeforeFlushStart() throws IOException { 315 byte[] row = Bytes.toBytes(0); 316 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 317 failOne(); 318 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 319 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 320 flushPrimary(); 321 // this also tests start flush with empty memstore at secondary replica side 322 replicateAll(); 323 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 324 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); 325 } 326 327 @Test 328 public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException { 329 primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 330 replicateAll(); 331 TO_ADD_AFTER_PREPARE_FLUSH 332 .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 333 flushPrimary(); 334 // replicate the start flush edit 335 replicateOne(); 336 // fail the remaining edits, the put and the commit flush edit 337 failOne(); 338 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 339 primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3))); 340 flushPrimary(); 341 replicateAll(); 342 for (int i = 0; i < 3; i++) { 343 assertEquals(i + 1, 344 Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL))); 345 } 346 // should have nothing in memstore 347 assertEquals(0, secondary.getMemStoreDataSize()); 348 } 349 350 @Test 351 public void testCatchUpWithCannotFlush() throws IOException, InterruptedException { 352 byte[] row = Bytes.toBytes(0); 353 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 354 failOne(); 355 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 356 flushPrimary(); 357 failAll(); 358 Thread.sleep(2000); 359 // we will request flush the second time 360 verify(flushRequester, times(2)).requestFlush(any(), anyList(), any()); 361 // we can not flush because no content in memstore 362 FlushResult result = flushPrimary(); 363 assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult()); 364 // the secondary replica does not have this row yet 365 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); 366 // replicate the can not flush edit 367 replicateOne(); 368 // we should have the row now 369 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 370 } 371 372 @Test 373 public void testCatchUpWithReopen() throws IOException { 374 byte[] row = Bytes.toBytes(0); 375 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 376 failOne(); 377 primary.close(); 378 // the secondary replica does not have this row yet, although the above close has flushed the 379 // data out 380 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); 381 382 // reopen 383 primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(), 384 UTIL.getConfiguration(), rss, null); 385 replicateAll(); 386 // we should have the row now 387 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 388 } 389}