001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItem; 022import static org.hamcrest.Matchers.hasSize; 023 024import java.io.IOException; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; 032import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 033import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 036import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 037import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; 038import org.apache.hadoop.hbase.testclassification.MasterTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.zookeeper.ZKUtil; 041import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 042import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 043import org.hamcrest.Matchers; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ MasterTests.class, MediumTests.class }) 053public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedureRecovery.class); 058 059 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 060 061 @BeforeClass 062 public static void setupCluster() throws Exception { 063 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); 064 UTIL.startMiniCluster(1); 065 } 066 067 @AfterClass 068 public static void cleanupTest() throws Exception { 069 UTIL.shutdownMiniCluster(); 070 } 071 072 private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 073 return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); 074 } 075 076 @Before 077 public void setup() throws Exception { 078 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); 079 } 080 081 @After 082 public void tearDown() throws Exception { 083 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); 084 } 085 086 private String getHFileRefsZNode() throws IOException { 087 Configuration conf = UTIL.getConfiguration(); 088 ZKWatcher zk = UTIL.getZooKeeperWatcher(); 089 String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, 090 conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE, 091 ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT)); 092 return ZNodePaths.joinZNode(replicationZNode, 093 conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 094 ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT)); 095 } 096 097 @Test 098 public void testRecoveryAndDoubleExecution() throws Exception { 099 String peerId = "2"; 100 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 101 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") 102 .setReplicateAllUserTables(true).build(); 103 UTIL.getAdmin().addReplicationPeer(peerId, rpc); 104 105 // here we only test a simple migration, more complicated migration will be tested in other UTs, 106 // such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk 107 String hfileRefsZNode = getHFileRefsZNode(); 108 String hfile = "hfile"; 109 String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile); 110 ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode); 111 112 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 113 114 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 115 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); 116 117 // Start the migration procedure && kill the executor 118 long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure()); 119 // Restart the executor and execute the step twice 120 MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); 121 // Validate the migration result 122 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 123 ReplicationQueueStorage queueStorage = 124 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 125 List<String> hfiles = queueStorage.getReplicableHFiles(peerId); 126 assertThat(hfiles, Matchers.<List<String>> both(hasItem(hfile)).and(hasSize(1))); 127 } 128}