001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 033import org.apache.hadoop.hbase.replication.ReplicationQueueData; 034import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 035import org.apache.hadoop.hbase.replication.TestReplicationBase; 036import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.testclassification.MasterTests; 039import org.apache.hadoop.hbase.zookeeper.ZKUtil; 040import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 041import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 047 048@Category({ MasterTests.class, LargeTests.class }) 049public class TestMigrateReplicationQueue extends TestReplicationBase { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class); 054 055 private int disableAndInsert() throws Exception { 056 UTIL1.getAdmin().disableReplicationPeer(PEER_ID2); 057 return UTIL1.loadTable(htable1, famName); 058 } 059 060 private String getQueuesZNode() throws IOException { 061 Configuration conf = UTIL1.getConfiguration(); 062 ZKWatcher zk = UTIL1.getZooKeeperWatcher(); 063 String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, 064 conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE, 065 ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT)); 066 return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs")); 067 } 068 069 private void mockData() throws Exception { 070 // delete the replication queue table to simulate upgrading from an older version of hbase 071 TableName replicationQueueTableName = TableName 072 .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 073 ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 074 List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster() 075 .getReplicationPeerManager().getQueueStorage().listAllQueues(); 076 assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size()); 077 UTIL1.getAdmin().disableTable(replicationQueueTableName); 078 UTIL1.getAdmin().deleteTable(replicationQueueTableName); 079 // shutdown the hbase cluster 080 UTIL1.shutdownMiniHBaseCluster(); 081 ZKWatcher zk = UTIL1.getZooKeeperWatcher(); 082 String queuesZNode = getQueuesZNode(); 083 for (ReplicationQueueData queueData : queueDatas) { 084 String replicatorZNode = 085 ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString()); 086 String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId()); 087 assertEquals(1, queueData.getOffsets().size()); 088 ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values()); 089 String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal()); 090 ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset())); 091 } 092 } 093 094 @Test 095 public void testMigrate() throws Exception { 096 int count = disableAndInsert(); 097 mockData(); 098 restartSourceCluster(1); 099 UTIL1.waitFor(60000, 100 () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() 101 .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny() 102 .map(Procedure::isSuccess).orElse(false)); 103 TableName replicationQueueTableName = TableName 104 .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 105 ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 106 assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName)); 107 ZKWatcher zk = UTIL1.getZooKeeperWatcher(); 108 assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode())); 109 // wait until SCP finishes, which means we can finish the claim queue operation 110 UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() 111 .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); 112 List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster() 113 .getReplicationPeerManager().getQueueStorage().listAllQueues(); 114 assertEquals(1, queueDatas.size()); 115 // should have 1 recovered queue, as we haven't replicated anything out so there is no queue 116 // data for the new alive region server 117 assertTrue(queueDatas.get(0).getId().isRecovered()); 118 assertEquals(1, queueDatas.get(0).getOffsets().size()); 119 // the peer is still disabled, so no data has been replicated 120 assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2)); 121 assertEquals(0, HBaseTestingUtil.countRows(htable2)); 122 // enable peer, and make sure the replication can continue correctly 123 UTIL1.getAdmin().enableReplicationPeer(PEER_ID2); 124 waitForReplication(count, 100); 125 } 126}