001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.not;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationQueueData;
041import org.apache.hadoop.hbase.replication.ReplicationQueueId;
042import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
043import org.apache.hadoop.hbase.replication.ReplicationUtils;
044import org.apache.hadoop.hbase.replication.TestReplicationBase;
045import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKUtil;
050import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
051import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
057
058@Category({ MasterTests.class, LargeTests.class })
059public class TestMigrateReplicationQueue extends TestReplicationBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class);
064
065  private int disableAndInsert() throws Exception {
066    UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
067    return UTIL1.loadTable(htable1, famName);
068  }
069
070  private String getQueuesZNode() throws IOException {
071    Configuration conf = UTIL1.getConfiguration();
072    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
073    String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
074      conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
075        ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
076    return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
077  }
078
079  private void mockData() throws Exception {
080    // fake a region_replica_replication peer and its queue data
081    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
082      .setClusterKey("127.0.0.1:2181:/hbase")
083      .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
084    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
085    master.getReplicationPeerManager()
086      .addPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
087    ServerName rsName = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
088    master.getReplicationPeerManager().getQueueStorage().setOffset(
089      new ReplicationQueueId(rsName, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), "",
090      new ReplicationGroupOffset("test-wal-file", 0), Collections.emptyMap());
091
092    // delete the replication queue table to simulate upgrading from an older version of hbase
093    TableName replicationQueueTableName = TableName
094      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
095        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
096    List<ReplicationQueueData> queueDatas =
097      master.getReplicationPeerManager().getQueueStorage().listAllQueues();
098    // have an extra mocked queue data for region_replica_replication peer
099    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size() + 1,
100      queueDatas.size());
101    UTIL1.getAdmin().disableTable(replicationQueueTableName);
102    UTIL1.getAdmin().deleteTable(replicationQueueTableName);
103    // shutdown the hbase cluster
104    UTIL1.shutdownMiniHBaseCluster();
105    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
106    String queuesZNode = getQueuesZNode();
107    for (ReplicationQueueData queueData : queueDatas) {
108      String replicatorZNode =
109        ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
110      String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
111      assertEquals(1, queueData.getOffsets().size());
112      ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
113      String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
114      ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
115    }
116  }
117
118  @Test
119  public void testMigrate() throws Exception {
120    int count = disableAndInsert();
121    mockData();
122    restartSourceCluster(1);
123    UTIL1.waitFor(60000,
124      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
125        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
126        .map(Procedure::isSuccess).orElse(false));
127    TableName replicationQueueTableName = TableName
128      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
129        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
130    assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
131    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
132    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
133    // wait until MigrateReplicationQueueFromZkToTableProcedure finishes
134    UTIL1.waitFor(15000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
135      .anyMatch(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure));
136    UTIL1.waitFor(60000,
137      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
138        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
139        .allMatch(Procedure::isSuccess));
140    // make sure the region_replica_replication peer is gone, and there is no data on zk
141    assertThat(UTIL1.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getPeerStorage()
142      .listPeerIds(), not(contains(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)));
143    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
144
145    // wait until SCP finishes, which means we can finish the claim queue operation
146    UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
147      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
148
149    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
150      .getReplicationPeerManager().getQueueStorage().listAllQueues();
151    assertEquals(1, queueDatas.size());
152    // should have 1 recovered queue, as we haven't replicated anything out so there is no queue
153    // data for the new alive region server
154    assertTrue(queueDatas.get(0).getId().isRecovered());
155    assertEquals(1, queueDatas.get(0).getOffsets().size());
156    // the peer is still disabled, so no data has been replicated
157    assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
158    assertEquals(0, HBaseTestingUtil.countRows(htable2));
159    // enable peer, and make sure the replication can continue correctly
160    UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
161    waitForReplication(count, 100);
162  }
163}