001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.Collections; 023import java.util.List; 024import java.util.stream.Stream; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 030import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 031import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 032import org.apache.hadoop.hbase.replication.ReplicationQueueId; 033import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 034import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 035import org.apache.hadoop.hbase.replication.SyncReplicationState; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.testclassification.ReplicationTests; 038import org.apache.hadoop.hbase.util.HbckErrorReporter.ERROR_CODE; 039import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 040import org.junit.After; 041import org.junit.Before; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047 048@Category({ ReplicationTests.class, MediumTests.class }) 049public class TestHBaseFsckReplication { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestHBaseFsckReplication.class); 054 055 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 056 @Rule 057 public final TestName name = new TestName(); 058 059 @Before 060 public void setUp() throws Exception { 061 UTIL.getConfiguration().setBoolean("hbase.write.hbck1.lock.file", false); 062 UTIL.startMiniCluster(1); 063 TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); 064 UTIL.getAdmin() 065 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 066 UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 067 tableName.getNameAsString()); 068 } 069 070 @After 071 public void tearDown() throws Exception { 072 UTIL.shutdownMiniCluster(); 073 } 074 075 @Test 076 public void test() throws Exception { 077 ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage( 078 UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 079 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 080 .getReplicationQueueStorage(UTIL.getConnection(), UTIL.getConfiguration()); 081 082 String peerId1 = "1"; 083 String peerId2 = "2"; 084 peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 085 true, SyncReplicationState.NONE); 086 peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 087 true, SyncReplicationState.NONE); 088 ReplicationQueueId queueId = null; 089 for (int i = 0; i < 10; i++) { 090 queueId = new ReplicationQueueId(getServerName(i), peerId1); 091 queueStorage.setOffset(queueId, "group-" + i, 092 new ReplicationGroupOffset("file-" + i, i * 100), Collections.emptyMap()); 093 } 094 queueId = new ReplicationQueueId(getServerName(0), peerId2); 095 queueStorage.setOffset(queueId, "group-" + 0, new ReplicationGroupOffset("file-" + 0, 100), 096 Collections.emptyMap()); 097 HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 098 HbckTestingUtil.assertNoErrors(fsck); 099 100 // should not remove anything since the replication peer is still alive 101 assertEquals(10, queueStorage.listAllReplicators().size()); 102 peerStorage.removePeer(peerId1); 103 // there should be orphan queues 104 assertEquals(10, queueStorage.listAllReplicators().size()); 105 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false); 106 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 107 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 108 }).limit(10).toArray(ERROR_CODE[]::new)); 109 110 // should not delete anything when fix is false 111 assertEquals(10, queueStorage.listAllReplicators().size()); 112 113 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 114 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 115 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 116 }).limit(10).toArray(HbckErrorReporter.ERROR_CODE[]::new)); 117 118 List<ServerName> replicators = queueStorage.listAllReplicators(); 119 // should not remove the server with queue for peerId2 120 assertEquals(1, replicators.size()); 121 assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0)); 122 for (ReplicationQueueId qId : queueStorage.listAllQueueIds(replicators.get(0))) { 123 assertEquals(peerId2, qId.getPeerId()); 124 } 125 } 126 127 private ServerName getServerName(int i) { 128 return ServerName.valueOf("localhost", 10000 + i, 100000 + i); 129 } 130}