001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItem; 022import static org.hamcrest.Matchers.hasSize; 023import static org.hamcrest.Matchers.not; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.stream.Collectors; 036import java.util.stream.IntStream; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNameTestRule; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.ReplicationTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.MD5Hash; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.zookeeper.KeeperException; 052import org.hamcrest.Matchers; 053import org.hamcrest.collection.IsEmptyCollection; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 065import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 066 067@Category({ ReplicationTests.class, MediumTests.class }) 068public class TestTableReplicationQueueStorage { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestTableReplicationQueueStorage.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestTableReplicationQueueStorage.class); 075 076 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 077 078 @Rule 079 public TableNameTestRule tableNameRule = new TableNameTestRule(); 080 081 private TableReplicationQueueStorage storage; 082 083 @BeforeClass 084 public static void setUp() throws Exception { 085 UTIL.startMiniCluster(); 086 } 087 088 @AfterClass 089 public static void tearDown() throws IOException { 090 UTIL.shutdownMiniCluster(); 091 } 092 093 @Before 094 public void setUpBeforeTest() throws Exception { 095 TableName tableName = tableNameRule.getTableName(); 096 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 097 UTIL.getAdmin().createTable(td); 098 UTIL.waitTableAvailable(tableName); 099 storage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName); 100 } 101 102 private ServerName getServerName(int i) { 103 return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); 104 } 105 106 private String getFileName(String base, int i) { 107 return String.format(base + "-%04d", i); 108 } 109 110 @Test 111 public void testReplicator() throws ReplicationException { 112 assertTrue(storage.listAllReplicators().isEmpty()); 113 String peerId = "1"; 114 for (int i = 0; i < 10; i++) { 115 ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId); 116 storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("file-" + i, i * 100), 117 Collections.emptyMap()); 118 } 119 List<ServerName> replicators = storage.listAllReplicators(); 120 assertEquals(10, replicators.size()); 121 for (int i = 0; i < 10; i++) { 122 assertThat(replicators, hasItem(getServerName(i))); 123 } 124 for (int i = 0; i < 5; i++) { 125 ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId); 126 storage.removeQueue(queueId); 127 } 128 replicators = storage.listAllReplicators(); 129 assertEquals(5, replicators.size()); 130 for (int i = 0; i < 5; i++) { 131 assertThat(replicators, not(hasItem(getServerName(i)))); 132 } 133 for (int i = 5; i < 10; i++) { 134 assertThat(replicators, hasItem(getServerName(i))); 135 } 136 } 137 138 @Test 139 public void testGetSetOffset() { 140 141 } 142 143 private void assertQueueId(String peerId, ServerName serverName, ReplicationQueueId queueId) { 144 assertEquals(peerId, queueId.getPeerId()); 145 assertEquals(serverName, queueId.getServerName()); 146 assertFalse(queueId.getSourceServerName().isPresent()); 147 } 148 149 @Test 150 public void testPersistLogPositionAndSeqIdAtomically() throws Exception { 151 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 152 assertTrue(storage.listAllQueueIds(serverName1).isEmpty()); 153 String peerId1 = "1"; 154 String region0 = "6b2c8f8555335cc9af74455b94516cbe"; 155 String region1 = "6ecd2e9e010499f8ddef97ee8f70834f"; 156 157 for (int i = 0; i < 10; i++) { 158 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1); 159 assertTrue(storage.getOffsets(queueId).isEmpty()); 160 } 161 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region0, peerId1)); 162 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region1, peerId1)); 163 164 for (int i = 0; i < 10; i++) { 165 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1); 166 storage.setOffset(queueId, "group1-" + i, 167 new ReplicationGroupOffset(getFileName("file1", i), (i + 1) * 100), 168 ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); 169 } 170 171 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(serverName1); 172 assertEquals(1, queueIds.size()); 173 assertQueueId(peerId1, serverName1, queueIds.get(0)); 174 175 Map<String, ReplicationGroupOffset> offsets = 176 storage.getOffsets(new ReplicationQueueId(serverName1, peerId1)); 177 for (int i = 0; i < 10; i++) { 178 ReplicationGroupOffset offset = offsets.get("group1-" + i); 179 assertEquals(getFileName("file1", i), offset.getWal()); 180 assertEquals((i + 1) * 100, offset.getOffset()); 181 } 182 assertEquals(900L, storage.getLastSequenceId(region0, peerId1)); 183 assertEquals(1000L, storage.getLastSequenceId(region1, peerId1)); 184 185 // Try to decrease the last pushed id by setWALPosition method. 186 storage.setOffset(new ReplicationQueueId(serverName1, peerId1), "group1-0", 187 new ReplicationGroupOffset(getFileName("file1", 0), 11 * 100), 188 ImmutableMap.of(region0, 899L, region1, 1001L)); 189 assertEquals(900L, storage.getLastSequenceId(region0, peerId1)); 190 assertEquals(1001L, storage.getLastSequenceId(region1, peerId1)); 191 } 192 193 private void assertGroupOffset(String wal, long offset, ReplicationGroupOffset groupOffset) { 194 assertEquals(wal, groupOffset.getWal()); 195 assertEquals(offset, groupOffset.getOffset()); 196 } 197 198 @Test 199 public void testClaimQueue() throws Exception { 200 String peerId = "1"; 201 ServerName serverName1 = getServerName(1); 202 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId); 203 for (int i = 0; i < 10; i++) { 204 storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("wal-" + i, i), 205 Collections.emptyMap()); 206 } 207 208 ServerName serverName2 = getServerName(2); 209 Map<String, ReplicationGroupOffset> offsets2 = storage.claimQueue(queueId, serverName2); 210 assertEquals(10, offsets2.size()); 211 for (int i = 0; i < 10; i++) { 212 assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i)); 213 } 214 ReplicationQueueId claimedQueueId2 = new ReplicationQueueId(serverName2, peerId, serverName1); 215 assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty()); 216 assertThat(storage.listAllQueueIds(peerId, serverName2), 217 Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId2)).and(hasSize(1))); 218 offsets2 = storage.getOffsets(claimedQueueId2); 219 assertEquals(10, offsets2.size()); 220 for (int i = 0; i < 10; i++) { 221 assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i)); 222 } 223 224 ServerName serverName3 = getServerName(3); 225 Map<String, ReplicationGroupOffset> offsets3 = storage.claimQueue(claimedQueueId2, serverName3); 226 assertEquals(10, offsets3.size()); 227 for (int i = 0; i < 10; i++) { 228 assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i)); 229 } 230 ReplicationQueueId claimedQueueId3 = new ReplicationQueueId(serverName3, peerId, serverName1); 231 assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty()); 232 assertThat(storage.listAllQueueIds(peerId, serverName2), IsEmptyCollection.empty()); 233 assertThat(storage.listAllQueueIds(peerId, serverName3), 234 Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId3)).and(hasSize(1))); 235 offsets3 = storage.getOffsets(claimedQueueId3); 236 assertEquals(10, offsets3.size()); 237 for (int i = 0; i < 10; i++) { 238 assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i)); 239 } 240 storage.removeQueue(claimedQueueId3); 241 assertThat(storage.listAllQueueIds(peerId), IsEmptyCollection.empty()); 242 } 243 244 @Test 245 public void testClaimQueueMultiThread() throws Exception { 246 String peerId = "3"; 247 String walGroup = "group"; 248 ReplicationGroupOffset groupOffset = new ReplicationGroupOffset("wal", 123); 249 ServerName sourceServerName = getServerName(100); 250 ReplicationQueueId queueId = new ReplicationQueueId(sourceServerName, peerId); 251 storage.setOffset(queueId, walGroup, groupOffset, Collections.emptyMap()); 252 List<ServerName> serverNames = 253 IntStream.range(0, 10).mapToObj(this::getServerName).collect(Collectors.toList()); 254 for (int i = 0; i < 10; i++) { 255 final ReplicationQueueId toClaim = queueId; 256 List<Thread> threads = new ArrayList<>(); 257 Map<ServerName, Map<String, ReplicationGroupOffset>> claimed = new ConcurrentHashMap<>(); 258 Set<ServerName> failed = ConcurrentHashMap.newKeySet(); 259 for (ServerName serverName : serverNames) { 260 if (serverName.equals(queueId.getServerName())) { 261 continue; 262 } 263 threads.add(new Thread("Claim-" + i + "-" + serverName) { 264 265 @Override 266 public void run() { 267 try { 268 Map<String, ReplicationGroupOffset> offsets = storage.claimQueue(toClaim, serverName); 269 if (!offsets.isEmpty()) { 270 claimed.put(serverName, offsets); 271 } 272 } catch (ReplicationException e) { 273 LOG.error("failed to claim queue", e); 274 failed.add(serverName); 275 } 276 } 277 }); 278 } 279 LOG.info("Claim round {}, there are {} threads to claim {}", i, threads.size(), toClaim); 280 for (Thread thread : threads) { 281 thread.start(); 282 } 283 for (Thread thread : threads) { 284 thread.join(30000); 285 assertFalse(thread.isAlive()); 286 } 287 LOG.info("Finish claim round {}, claimed={}, failed={}", i, claimed, failed); 288 assertThat(failed, IsEmptyCollection.empty()); 289 assertEquals(1, claimed.size()); 290 Map<String, ReplicationGroupOffset> offsets = Iterables.getOnlyElement(claimed.values()); 291 assertEquals(1, offsets.size()); 292 assertGroupOffset("wal", 123, offsets.get("group")); 293 queueId = new ReplicationQueueId(Iterables.getOnlyElement(claimed.keySet()), peerId, 294 sourceServerName); 295 assertThat(storage.listAllQueueIds(peerId), 296 Matchers.<List<ReplicationQueueId>> both(hasItem(queueId)).and(hasSize(1))); 297 } 298 } 299 300 @Test 301 public void testListRemovePeerAllQueues() throws Exception { 302 String peerId1 = "1"; 303 String peerId2 = "2"; 304 for (int i = 0; i < 100; i++) { 305 ServerName serverName = getServerName(i); 306 String group = "group"; 307 ReplicationGroupOffset offset = new ReplicationGroupOffset("wal", i); 308 ReplicationQueueId queueId1 = new ReplicationQueueId(serverName, peerId1); 309 ReplicationQueueId queueId2 = new ReplicationQueueId(serverName, peerId2); 310 storage.setOffset(queueId1, group, offset, Collections.emptyMap()); 311 storage.setOffset(queueId2, group, offset, Collections.emptyMap()); 312 } 313 List<ReplicationQueueData> queueDatas = storage.listAllQueues(); 314 assertThat(queueDatas, hasSize(200)); 315 for (int i = 0; i < 100; i++) { 316 ReplicationQueueData peerId1Data = queueDatas.get(i); 317 ReplicationQueueData peerId2Data = queueDatas.get(i + 100); 318 ServerName serverName = getServerName(i); 319 assertEquals(new ReplicationQueueId(serverName, peerId1), peerId1Data.getId()); 320 assertEquals(new ReplicationQueueId(serverName, peerId2), peerId2Data.getId()); 321 assertEquals(1, peerId1Data.getOffsets().size()); 322 assertEquals(1, peerId2Data.getOffsets().size()); 323 assertGroupOffset("wal", i, peerId1Data.getOffsets().get("group")); 324 assertGroupOffset("wal", i, peerId2Data.getOffsets().get("group")); 325 } 326 List<ReplicationQueueId> queueIds1 = storage.listAllQueueIds(peerId1); 327 assertThat(queueIds1, hasSize(100)); 328 for (int i = 0; i < 100; i++) { 329 ServerName serverName = getServerName(i); 330 assertEquals(new ReplicationQueueId(serverName, peerId1), queueIds1.get(i)); 331 } 332 List<ReplicationQueueId> queueIds2 = storage.listAllQueueIds(peerId2); 333 assertThat(queueIds2, hasSize(100)); 334 for (int i = 0; i < 100; i++) { 335 ServerName serverName = getServerName(i); 336 assertEquals(new ReplicationQueueId(serverName, peerId2), queueIds2.get(i)); 337 } 338 339 storage.removeAllQueues(peerId1); 340 assertThat(storage.listAllQueues(), hasSize(100)); 341 assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty()); 342 assertThat(storage.listAllQueueIds(peerId2), hasSize(100)); 343 344 storage.removeAllQueues(peerId2); 345 assertThat(storage.listAllQueues(), IsEmptyCollection.empty()); 346 assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty()); 347 assertThat(storage.listAllQueueIds(peerId2), IsEmptyCollection.empty()); 348 } 349 350 @Test 351 public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception { 352 String peerId = "1"; 353 String peerIdToDelete = "2"; 354 for (int i = 0; i < 100; i++) { 355 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 356 storage.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i)); 357 storage.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i)); 358 } 359 for (int i = 0; i < 100; i++) { 360 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 361 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId)); 362 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerIdToDelete)); 363 } 364 storage.removeLastSequenceIds(peerIdToDelete); 365 for (int i = 0; i < 100; i++) { 366 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 367 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId)); 368 assertEquals(HConstants.NO_SEQNUM, 369 storage.getLastSequenceId(encodedRegionName, peerIdToDelete)); 370 } 371 } 372 373 @Test 374 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 375 String peerId1 = "1"; 376 377 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 378 files1.add(new Pair<>(null, new Path("file_1"))); 379 files1.add(new Pair<>(null, new Path("file_2"))); 380 files1.add(new Pair<>(null, new Path("file_3"))); 381 assertTrue(storage.getReplicableHFiles(peerId1).isEmpty()); 382 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 383 384 storage.addHFileRefs(peerId1, files1); 385 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 386 assertEquals(3, storage.getReplicableHFiles(peerId1).size()); 387 List<String> hfiles2 = new ArrayList<>(files1.size()); 388 for (Pair<Path, Path> p : files1) { 389 hfiles2.add(p.getSecond().getName()); 390 } 391 String removedString = hfiles2.remove(0); 392 storage.removeHFileRefs(peerId1, hfiles2); 393 assertEquals(1, storage.getReplicableHFiles(peerId1).size()); 394 hfiles2 = new ArrayList<>(1); 395 hfiles2.add(removedString); 396 storage.removeHFileRefs(peerId1, hfiles2); 397 assertEquals(0, storage.getReplicableHFiles(peerId1).size()); 398 } 399 400 @Test 401 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 402 String peerId1 = "1"; 403 String peerId2 = "2"; 404 405 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 406 files1.add(new Pair<>(null, new Path("file_1"))); 407 files1.add(new Pair<>(null, new Path("file_2"))); 408 files1.add(new Pair<>(null, new Path("file_3"))); 409 storage.addHFileRefs(peerId1, files1); 410 storage.addHFileRefs(peerId2, files1); 411 assertEquals(2, storage.getAllPeersFromHFileRefsQueue().size()); 412 assertEquals(3, storage.getReplicableHFiles(peerId1).size()); 413 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 414 415 storage.removePeerFromHFileRefs(peerId1); 416 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 417 assertTrue(storage.getReplicableHFiles(peerId1).isEmpty()); 418 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 419 420 storage.removePeerFromHFileRefs(peerId2); 421 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 422 assertTrue(storage.getReplicableHFiles(peerId2).isEmpty()); 423 } 424 425 private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2) 426 throws ReplicationException { 427 for (int i = 0; i < 100; i++) { 428 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 429 storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i)); 430 } 431 432 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 433 files1.add(new Pair<>(null, new Path("file_1"))); 434 files1.add(new Pair<>(null, new Path("file_2"))); 435 files1.add(new Pair<>(null, new Path("file_3"))); 436 storage.addHFileRefs(peerId2, files1); 437 } 438 439 @Test 440 public void testRemoveLastSequenceIdsAndHFileRefsBefore() 441 throws ReplicationException, InterruptedException { 442 String peerId1 = "1"; 443 String peerId2 = "2"; 444 addLastSequenceIdsAndHFileRefs(peerId1, peerId2); 445 // make sure we have write these out 446 for (int i = 0; i < 100; i++) { 447 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 448 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); 449 } 450 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 451 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 452 453 // should have nothing after removal 454 long ts = EnvironmentEdgeManager.currentTime(); 455 storage.removeLastSequenceIdsAndHFileRefsBefore(ts); 456 for (int i = 0; i < 100; i++) { 457 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 458 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1)); 459 } 460 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 461 462 Thread.sleep(100); 463 // add again and remove with the old timestamp 464 addLastSequenceIdsAndHFileRefs(peerId1, peerId2); 465 storage.removeLastSequenceIdsAndHFileRefsBefore(ts); 466 // make sure we do not delete the data which are written after the give timestamp 467 for (int i = 0; i < 100; i++) { 468 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 469 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); 470 } 471 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 472 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 473 } 474}