001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Arrays; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.master.RegionState; 047import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 050import org.apache.hadoop.hbase.replication.ReplicationQueueId; 051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.ReplicationTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.wal.WAL.Entry; 058import org.apache.hadoop.hbase.wal.WALKeyImpl; 059import org.junit.AfterClass; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 071 072@Category({ ReplicationTests.class, MediumTests.class }) 073public class TestSerialReplicationChecker { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestSerialReplicationChecker.class); 078 079 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 080 081 private static String PEER_ID = "1"; 082 083 private static ReplicationQueueStorage QUEUE_STORAGE; 084 085 private static String WAL_FILE_NAME = "test.wal"; 086 087 private Connection conn; 088 089 private SerialReplicationChecker checker; 090 091 @Rule 092 public final TestName name = new TestName(); 093 094 private TableName tableName; 095 096 @BeforeClass 097 public static void setUpBeforeClass() throws Exception { 098 UTIL.startMiniCluster(1); 099 TableName repTable = TableName.valueOf("test_serial_rep"); 100 UTIL.getAdmin() 101 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable)); 102 QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), 103 UTIL.getConfiguration(), repTable); 104 } 105 106 @AfterClass 107 public static void tearDownAfterClass() throws Exception { 108 UTIL.shutdownMiniCluster(); 109 } 110 111 @Before 112 public void setUp() throws IOException { 113 ReplicationSource source = mock(ReplicationSource.class); 114 when(source.getPeerId()).thenReturn(PEER_ID); 115 when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE); 116 conn = mock(Connection.class); 117 when(conn.isClosed()).thenReturn(false); 118 doAnswer(new Answer<Table>() { 119 120 @Override 121 public Table answer(InvocationOnMock invocation) throws Throwable { 122 return UTIL.getConnection().getTable((TableName) invocation.getArgument(0)); 123 } 124 125 }).when(conn).getTable(any(TableName.class)); 126 Server server = mock(Server.class); 127 when(server.getConnection()).thenReturn(conn); 128 when(source.getServer()).thenReturn(server); 129 checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); 130 tableName = TableName.valueOf(name.getMethodName()); 131 } 132 133 private Entry createEntry(RegionInfo region, long seqId) { 134 WALKeyImpl key = mock(WALKeyImpl.class); 135 when(key.getTableName()).thenReturn(tableName); 136 when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes()); 137 when(key.getSequenceId()).thenReturn(seqId); 138 Entry entry = mock(Entry.class); 139 when(entry.getKey()).thenReturn(key); 140 return entry; 141 } 142 143 private Cell createCell(RegionInfo region) { 144 return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey()) 145 .setType(Type.Put).build(); 146 } 147 148 @Test 149 public void testNoBarrierCanPush() throws IOException { 150 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 151 assertTrue(checker.canPush(createEntry(region, 100), createCell(region))); 152 } 153 154 private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) 155 throws IOException { 156 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 157 if (state != null) { 158 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 159 Bytes.toBytes(state.name())); 160 } 161 for (int i = 0; i < barriers.length; i++) { 162 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, 163 put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i])); 164 } 165 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 166 table.put(put); 167 } 168 } 169 170 private void setState(RegionInfo region, RegionState.State state) throws IOException { 171 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 172 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, 173 Bytes.toBytes(state.name())); 174 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 175 table.put(put); 176 } 177 } 178 179 private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { 180 ReplicationQueueId queueId = new ReplicationQueueId( 181 UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID); 182 QUEUE_STORAGE.setOffset(queueId, "", new ReplicationGroupOffset(WAL_FILE_NAME, 10), 183 ImmutableMap.of(region.getEncodedName(), seqId)); 184 } 185 186 private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException { 187 Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); 188 put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 189 ReplicationBarrierFamilyFormat.REPLICATION_PARENT_QUALIFIER, 190 ReplicationBarrierFamilyFormat.getParentsBytes(parents)); 191 try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { 192 table.put(put); 193 } 194 } 195 196 @Test 197 public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { 198 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 199 addStateAndBarrier(region, RegionState.State.OPEN, 10); 200 Cell cell = createCell(region); 201 // can push since we are in the first range 202 assertTrue(checker.canPush(createEntry(region, 100), cell)); 203 setState(region, RegionState.State.OPENING); 204 // can not push since we are in the last range and the state is OPENING 205 assertFalse(checker.canPush(createEntry(region, 102), cell)); 206 addStateAndBarrier(region, RegionState.State.OPEN, 50); 207 // can not push since the previous range has not been finished yet 208 assertFalse(checker.canPush(createEntry(region, 102), cell)); 209 updatePushedSeqId(region, 49); 210 // can push since the previous range has been finished 211 assertTrue(checker.canPush(createEntry(region, 102), cell)); 212 setState(region, RegionState.State.OPENING); 213 assertFalse(checker.canPush(createEntry(region, 104), cell)); 214 } 215 216 @Test 217 public void testCanPushUnder() throws IOException, ReplicationException { 218 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 219 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 220 updatePushedSeqId(region, 9); 221 Cell cell = createCell(region); 222 assertTrue(checker.canPush(createEntry(region, 20), cell)); 223 verify(conn, times(1)).getTable(any(TableName.class)); 224 // not continuous 225 for (int i = 22; i < 100; i += 2) { 226 assertTrue(checker.canPush(createEntry(region, i), cell)); 227 } 228 // verify that we do not go to meta table 229 verify(conn, times(1)).getTable(any(TableName.class)); 230 } 231 232 @Test 233 public void testCanPushIfContinuous() throws IOException, ReplicationException { 234 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 235 addStateAndBarrier(region, RegionState.State.OPEN, 10); 236 updatePushedSeqId(region, 9); 237 Cell cell = createCell(region); 238 assertTrue(checker.canPush(createEntry(region, 20), cell)); 239 verify(conn, times(1)).getTable(any(TableName.class)); 240 // continuous 241 for (int i = 21; i < 100; i++) { 242 assertTrue(checker.canPush(createEntry(region, i), cell)); 243 } 244 // verify that we do not go to meta table 245 verify(conn, times(1)).getTable(any(TableName.class)); 246 } 247 248 @Test 249 public void testCanPushAfterMerge() throws IOException, ReplicationException { 250 // 0xFF is the escape byte when storing region name so let's make sure it can work. 251 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 252 RegionInfo regionA = 253 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build(); 254 RegionInfo regionB = 255 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build(); 256 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build(); 257 addStateAndBarrier(regionA, null, 10, 100); 258 addStateAndBarrier(regionB, null, 20, 200); 259 addStateAndBarrier(region, RegionState.State.OPEN, 200); 260 addParents(region, Arrays.asList(regionA, regionB)); 261 Cell cell = createCell(region); 262 // can not push since both parents have not been finished yet 263 assertFalse(checker.canPush(createEntry(region, 300), cell)); 264 updatePushedSeqId(regionB, 199); 265 // can not push since regionA has not been finished yet 266 assertFalse(checker.canPush(createEntry(region, 300), cell)); 267 updatePushedSeqId(regionA, 99); 268 // can push since all parents have been finished 269 assertTrue(checker.canPush(createEntry(region, 300), cell)); 270 } 271 272 @Test 273 public void testCanPushAfterSplit() throws IOException, ReplicationException { 274 // 0xFF is the escape byte when storing region name so let's make sure it can work. 275 byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; 276 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build(); 277 RegionInfo regionA = 278 RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build(); 279 RegionInfo regionB = 280 RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build(); 281 addStateAndBarrier(region, null, 10, 100); 282 addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200); 283 addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300); 284 addParents(regionA, Arrays.asList(region)); 285 addParents(regionB, Arrays.asList(region)); 286 Cell cellA = createCell(regionA); 287 Cell cellB = createCell(regionB); 288 // can not push since parent has not been finished yet 289 assertFalse(checker.canPush(createEntry(regionA, 150), cellA)); 290 assertFalse(checker.canPush(createEntry(regionB, 200), cellB)); 291 updatePushedSeqId(region, 99); 292 // can push since parent has been finished 293 assertTrue(checker.canPush(createEntry(regionA, 150), cellA)); 294 assertTrue(checker.canPush(createEntry(regionB, 200), cellB)); 295 } 296 297 @Test 298 public void testCanPushEqualsToBarrier() throws IOException, ReplicationException { 299 // For binary search, equals to an element will result to a positive value, let's test whether 300 // it works. 301 RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); 302 addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); 303 Cell cell = createCell(region); 304 assertTrue(checker.canPush(createEntry(region, 10), cell)); 305 assertFalse(checker.canPush(createEntry(region, 100), cell)); 306 updatePushedSeqId(region, 99); 307 assertTrue(checker.canPush(createEntry(region, 100), cell)); 308 } 309}