001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 028import org.apache.hadoop.hbase.zookeeper.ZKConfig; 029import org.junit.Test; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * White box testing for replication state interfaces. Implementations should extend this class, and 035 * initialize the interfaces properly. 036 */ 037public abstract class TestReplicationStateBasic { 038 039 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 040 041 protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); 042 protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); 043 protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); 044 protected ReplicationPeers rp; 045 protected static final String ID_ONE = "1"; 046 protected static final String ID_TWO = "2"; 047 protected static String KEY_ONE; 048 protected static String KEY_TWO; 049 050 // For testing when we try to replicate to ourself 051 protected String OUR_KEY; 052 053 protected static int zkTimeoutCount; 054 protected static final int ZK_MAX_COUNT = 300; 055 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 056 057 @Test 058 public void testReplicationPeers() throws Exception { 059 rp.init(); 060 061 try { 062 rp.getPeerStorage().setPeerState("bogus", true); 063 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 064 } catch (ReplicationException e) { 065 } 066 try { 067 rp.getPeerStorage().setPeerState("bogus", false); 068 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 069 } catch (ReplicationException e) { 070 } 071 072 try { 073 assertFalse(rp.addPeer("bogus")); 074 fail("Should have thrown an ReplicationException when passed a bogus peerId"); 075 } catch (ReplicationException e) { 076 } 077 078 assertNumberOfPeers(0); 079 080 // Add some peers 081 rp.getPeerStorage().addPeer(ID_ONE, 082 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 083 SyncReplicationState.NONE); 084 assertNumberOfPeers(1); 085 rp.getPeerStorage().addPeer(ID_TWO, 086 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, 087 SyncReplicationState.NONE); 088 assertNumberOfPeers(2); 089 090 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeerConfigUtil 091 .getPeerClusterConfiguration(rp.getConf(), rp.getPeerStorage().getPeerConfig(ID_ONE)))); 092 rp.getPeerStorage().removePeer(ID_ONE); 093 rp.removePeer(ID_ONE); 094 assertNumberOfPeers(1); 095 096 // Add one peer 097 rp.getPeerStorage().addPeer(ID_ONE, 098 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 099 SyncReplicationState.NONE); 100 rp.addPeer(ID_ONE); 101 assertNumberOfPeers(2); 102 assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); 103 rp.getPeerStorage().setPeerState(ID_ONE, false); 104 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 105 // manually... 106 ReplicationPeerImpl peer = rp.getPeer(ID_ONE); 107 rp.refreshPeerState(peer.getId()); 108 assertEquals(PeerState.DISABLED, peer.getPeerState()); 109 assertConnectedPeerStatus(false, ID_ONE); 110 rp.getPeerStorage().setPeerState(ID_ONE, true); 111 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 112 // manually... 113 rp.refreshPeerState(peer.getId()); 114 assertEquals(PeerState.ENABLED, peer.getPeerState()); 115 assertConnectedPeerStatus(true, ID_ONE); 116 117 // Disconnect peer 118 rp.removePeer(ID_ONE); 119 assertNumberOfPeers(2); 120 } 121 122 private void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 123 // we can first check if the value was changed in the store, if it wasn't then fail right away 124 if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { 125 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 126 } 127 while (true) { 128 if (status == rp.getPeer(peerId).isPeerEnabled()) { 129 return; 130 } 131 if (zkTimeoutCount < ZK_MAX_COUNT) { 132 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 133 + ", sleeping and trying again."); 134 Thread.sleep(ZK_SLEEP_INTERVAL); 135 } else { 136 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 137 } 138 } 139 } 140 141 private void assertNumberOfPeers(int total) throws ReplicationException { 142 assertEquals(total, rp.getPeerStorage().listPeerIds().size()); 143 } 144}