001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.net.URI; 026import java.util.Collection; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.client.AsyncClusterConnection; 033import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 034import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.junit.Before; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043 044@Category({ ReplicationTests.class, SmallTests.class }) 045public class TestHBaseReplicationEndpoint { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class); 050 051 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 052 053 private HBaseReplicationEndpoint endpoint; 054 055 @Before 056 public void setUp() throws Exception { 057 ReplicationPeer replicationPeer = mock(ReplicationPeer.class); 058 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 059 when(replicationPeer.getPeerConfig()).thenReturn(peerConfig); 060 when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase"); 061 ReplicationEndpoint.Context context = 062 new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, 063 null, null, replicationPeer, null, null, null); 064 endpoint = new DummyHBaseReplicationEndpoint(); 065 endpoint.init(context); 066 } 067 068 @Test 069 public void testChooseSinks() { 070 List<ServerName> serverNames = Lists.newArrayList(); 071 int totalServers = 20; 072 for (int i = 0; i < totalServers; i++) { 073 serverNames.add(mock(ServerName.class)); 074 } 075 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 076 endpoint.chooseSinks(); 077 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 078 assertEquals(expected, endpoint.getNumSinks()); 079 } 080 081 @Test 082 public void testChooseSinksLessThanRatioAvailable() { 083 List<ServerName> serverNames = 084 Lists.newArrayList(mock(ServerName.class), mock(ServerName.class)); 085 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 086 endpoint.chooseSinks(); 087 assertEquals(1, endpoint.getNumSinks()); 088 } 089 090 @Test 091 public void testReportBadSink() { 092 ServerName serverNameA = mock(ServerName.class); 093 ServerName serverNameB = mock(ServerName.class); 094 ((DummyHBaseReplicationEndpoint) endpoint) 095 .setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); 096 endpoint.chooseSinks(); 097 // Sanity check 098 assertEquals(1, endpoint.getNumSinks()); 099 100 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); 101 endpoint.reportBadSink(sinkPeer); 102 // Just reporting a bad sink once shouldn't have an effect 103 assertEquals(1, endpoint.getNumSinks()); 104 } 105 106 /** 107 * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be 108 * replicated to anymore. 109 */ 110 @Test 111 public void testReportBadSinkPastThreshold() { 112 List<ServerName> serverNames = Lists.newArrayList(); 113 int totalServers = 30; 114 for (int i = 0; i < totalServers; i++) { 115 serverNames.add(mock(ServerName.class)); 116 } 117 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 118 endpoint.chooseSinks(); 119 // Sanity check 120 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 121 assertEquals(expected, endpoint.getNumSinks()); 122 123 ServerName badSinkServer0 = endpoint.getSinkServers().get(0); 124 SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class)); 125 for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 126 endpoint.reportBadSink(sinkPeer); 127 } 128 // Reporting a bad sink more than the threshold count should remove it 129 // from the list of potential sinks 130 assertEquals(expected - 1, endpoint.getNumSinks()); 131 132 // now try a sink that has some successes 133 ServerName badSinkServer1 = endpoint.getSinkServers().get(0); 134 sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class)); 135 for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 136 endpoint.reportBadSink(sinkPeer); 137 } 138 endpoint.reportSinkSuccess(sinkPeer); // one success 139 endpoint.reportBadSink(sinkPeer); 140 // did not remove the sink, since we had one successful try 141 assertEquals(expected - 1, endpoint.getNumSinks()); 142 143 for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) { 144 endpoint.reportBadSink(sinkPeer); 145 } 146 // still not remove, since the success reset the counter 147 assertEquals(expected - 1, endpoint.getNumSinks()); 148 endpoint.reportBadSink(sinkPeer); 149 // but we exhausted the tries 150 assertEquals(expected - 2, endpoint.getNumSinks()); 151 } 152 153 @Test 154 public void testReportBadSinkDownToZeroSinks() { 155 List<ServerName> serverNames = Lists.newArrayList(); 156 int totalServers = 4; 157 for (int i = 0; i < totalServers; i++) { 158 serverNames.add(mock(ServerName.class)); 159 } 160 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 161 endpoint.chooseSinks(); 162 // Sanity check 163 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 164 assertEquals(expected, endpoint.getNumSinks()); 165 166 ServerName serverNameA = endpoint.getSinkServers().get(0); 167 ServerName serverNameB = endpoint.getSinkServers().get(1); 168 169 serverNames.remove(serverNameA); 170 serverNames.remove(serverNameB); 171 172 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); 173 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); 174 175 for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 176 endpoint.reportBadSink(sinkPeerA); 177 endpoint.reportBadSink(sinkPeerB); 178 } 179 180 // We've gone down to 0 good sinks, so the replication sinks 181 // should have been refreshed now, so out of 4 servers, 2 are not considered as they are 182 // reported as bad. 183 expected = 184 (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 185 assertEquals(expected, endpoint.getNumSinks()); 186 } 187 188 private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint { 189 190 List<ServerName> regionServers; 191 192 public void setRegionServers(List<ServerName> regionServers) { 193 this.regionServers = regionServers; 194 } 195 196 @Override 197 protected Collection<ServerName> fetchPeerAddresses() { 198 return regionServers; 199 } 200 201 @Override 202 public boolean replicate(ReplicateContext replicateContext) { 203 return false; 204 } 205 206 @Override 207 public AsyncClusterConnection createConnection(URI clusterURI, Configuration conf) 208 throws IOException { 209 return null; 210 } 211 } 212}