001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItems; 022import static org.hamcrest.Matchers.hasSize; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Collections; 031import java.util.NavigableMap; 032import java.util.Set; 033import java.util.TreeMap; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellBuilderType; 040import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 041import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 052import org.apache.hadoop.hbase.replication.ReplicationException; 053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 056import org.apache.hadoop.hbase.replication.ReplicationPeers; 057import org.apache.hadoop.hbase.replication.ReplicationQueueId; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 060import org.apache.hadoop.hbase.replication.ReplicationUtils; 061import org.apache.hadoop.hbase.replication.SyncReplicationState; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 070import org.apache.hadoop.hbase.wal.WALFactory; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.hamcrest.Matchers; 073import org.junit.After; 074import org.junit.AfterClass; 075import org.junit.Before; 076import org.junit.BeforeClass; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.rules.TestName; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 084 085@Category({ ReplicationTests.class, MediumTests.class }) 086public class TestReplicationSourceManager { 087 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 091 092 public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint { 093 094 private String clusterKey; 095 096 @Override 097 public boolean replicate(ReplicateContext replicateContext) { 098 // if you want to block the replication, for example, do not want the recovered source to be 099 // removed 100 if (clusterKey.endsWith("error")) { 101 throw new RuntimeException("Inject error"); 102 } 103 return true; 104 } 105 106 @Override 107 public void init(Context context) throws IOException { 108 super.init(context); 109 this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey(); 110 } 111 112 } 113 114 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 115 116 private static Configuration CONF; 117 118 private static FileSystem FS; 119 120 private static final byte[] F1 = Bytes.toBytes("f1"); 121 122 private static final byte[] F2 = Bytes.toBytes("f2"); 123 124 private static final TableName TABLE_NAME = TableName.valueOf("test"); 125 126 private static RegionInfo RI; 127 128 private static NavigableMap<byte[], Integer> SCOPES; 129 130 @Rule 131 public final TestName name = new TestName(); 132 133 private Path oldLogDir; 134 135 private Path logDir; 136 137 private Path remoteLogDir; 138 139 private Server server; 140 141 private Replication replication; 142 143 private ReplicationSourceManager manager; 144 145 @BeforeClass 146 public static void setUpBeforeClass() throws Exception { 147 UTIL.startMiniCluster(1); 148 FS = UTIL.getTestFileSystem(); 149 CONF = new Configuration(UTIL.getConfiguration()); 150 CONF.setLong("replication.sleep.before.failover", 0); 151 152 RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 153 SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); 154 SCOPES.put(F1, 1); 155 SCOPES.put(F2, 0); 156 } 157 158 @AfterClass 159 public static void tearDownAfterClass() throws IOException { 160 UTIL.shutdownMiniCluster(); 161 } 162 163 @Before 164 public void setUp() throws Exception { 165 Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName()); 166 CommonFSUtils.setRootDir(CONF, rootDir); 167 server = mock(Server.class); 168 when(server.getConfiguration()).thenReturn(CONF); 169 when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); 170 when(server.getConnection()).thenReturn(UTIL.getConnection()); 171 ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1); 172 when(server.getServerName()).thenReturn(sn); 173 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 174 FS.mkdirs(oldLogDir); 175 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 176 FS.mkdirs(logDir); 177 remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); 178 FS.mkdirs(remoteLogDir); 179 TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); 180 UTIL.getAdmin() 181 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 182 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 183 184 replication = new Replication(); 185 replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir, 186 new WALFactory(CONF, server.getServerName(), null)); 187 manager = replication.getReplicationManager(); 188 } 189 190 @After 191 public void tearDown() { 192 replication.stopReplicationService(); 193 } 194 195 /** 196 * Add a peer and wait for it to initialize 197 */ 198 private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep) 199 throws ReplicationException, IOException { 200 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 201 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 202 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 203 if (syncRep) { 204 builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList())) 205 .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString()); 206 } 207 208 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true, 209 syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE); 210 manager.addPeer(peerId); 211 UTIL.waitFor(20000, () -> { 212 ReplicationSourceInterface rs = manager.getSource(peerId); 213 return rs != null && rs.isSourceActive(); 214 }); 215 } 216 217 /** 218 * Remove a peer and wait for it to get cleaned up 219 */ 220 private void removePeerAndWait(String peerId) throws Exception { 221 ReplicationPeers rp = manager.getReplicationPeers(); 222 rp.getPeerStorage().removePeer(peerId); 223 manager.removePeer(peerId); 224 UTIL.waitFor(20000, () -> { 225 if (rp.getPeer(peerId) != null) { 226 return false; 227 } 228 if (manager.getSource(peerId) != null) { 229 return false; 230 } 231 return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId)); 232 }); 233 } 234 235 private void createWALFile(Path file) throws Exception { 236 ProtobufLogWriter writer = new ProtobufLogWriter(); 237 try { 238 writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null); 239 WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME, 240 EnvironmentEdgeManager.currentTime(), SCOPES); 241 WALEdit edit = new WALEdit(); 242 WALEditInternalHelper.addExtendedCell(edit, 243 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1) 244 .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build()); 245 WALEditInternalHelper.addExtendedCell(edit, 246 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2) 247 .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build()); 248 writer.append(new WAL.Entry(key, edit)); 249 writer.sync(false); 250 } finally { 251 writer.close(); 252 } 253 } 254 255 @Test 256 public void testClaimQueue() throws Exception { 257 String peerId = "1"; 258 addPeerAndWait(peerId, "error", false); 259 ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123); 260 String walName1 = serverName.toString() + ".1"; 261 createWALFile(new Path(oldLogDir, walName1)); 262 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 263 ReplicationQueueStorage queueStorage = manager.getQueueStorage(); 264 queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0), 265 Collections.emptyMap()); 266 manager.claimQueue(queueId); 267 assertThat(manager.getOldSources(), hasSize(1)); 268 } 269 270 @Test 271 public void testSameWALPrefix() throws IOException { 272 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 273 String walName2 = "localhost,8080,12345.56789"; 274 manager.postLogRoll(new Path(walName1)); 275 manager.postLogRoll(new Path(walName2)); 276 277 Set<String> latestWals = 278 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 279 assertThat(latestWals, 280 Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2))); 281 } 282 283 private MetricsReplicationSourceSource getGlobalSource() { 284 return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 285 .getGlobalSource(); 286 } 287 288 @Test 289 public void testRemovePeerMetricsCleanup() throws Exception { 290 MetricsReplicationSourceSource globalSource = getGlobalSource(); 291 int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 292 String peerId = "DummyPeer"; 293 addPeerAndWait(peerId, "hbase", false); 294 // there is no latestPaths so the size of log queue should not change 295 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 296 297 ReplicationSourceInterface source = manager.getSource(peerId); 298 // Sanity check 299 assertNotNull(source); 300 int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 301 // Enqueue log and check if metrics updated 302 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 303 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 304 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 305 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 306 globalSource.getSizeOfLogQueue()); 307 308 // Removing the peer should reset the global metrics 309 removePeerAndWait(peerId); 310 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 311 312 // Adding the same peer back again should reset the single source metrics 313 addPeerAndWait(peerId, "hbase", false); 314 source = manager.getSource(peerId); 315 assertNotNull(source); 316 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 317 globalSource.getSizeOfLogQueue()); 318 } 319 320 @Test 321 public void testDisablePeerMetricsCleanup() throws Exception { 322 final String peerId = "DummyPeer"; 323 try { 324 MetricsReplicationSourceSource globalSource = getGlobalSource(); 325 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 326 addPeerAndWait(peerId, "hbase", false); 327 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 328 ReplicationSourceInterface source = manager.getSource(peerId); 329 // Sanity check 330 assertNotNull(source); 331 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 332 // Enqueue log and check if metrics updated 333 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 334 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 335 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 336 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 337 globalSource.getSizeOfLogQueue()); 338 339 // Refreshing the peer should decrement the global and single source metrics 340 manager.refreshSources(peerId); 341 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 342 343 source = manager.getSource(peerId); 344 assertNotNull(source); 345 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 346 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 347 globalSource.getSizeOfLogQueue()); 348 } finally { 349 removePeerAndWait(peerId); 350 } 351 } 352 353 @Test 354 public void testRemoveRemoteWALs() throws Exception { 355 String peerId = "2"; 356 addPeerAndWait(peerId, "hbase", true); 357 // make sure that we can deal with files which does not exist 358 String walNameNotExists = 359 "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; 360 Path wal = new Path(logDir, walNameNotExists); 361 manager.postLogRoll(wal); 362 363 Path remoteLogDirForPeer = new Path(remoteLogDir, peerId); 364 FS.mkdirs(remoteLogDirForPeer); 365 String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; 366 Path remoteWAL = 367 new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory()); 368 FS.create(remoteWAL).close(); 369 wal = new Path(logDir, walName); 370 manager.postLogRoll(wal); 371 372 ReplicationSourceInterface source = manager.getSource(peerId); 373 manager.cleanOldLogs(walName, true, source); 374 assertFalse(FS.exists(remoteWAL)); 375 } 376}