001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.net.URI;
026import java.util.Collection;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.client.AsyncClusterConnection;
033import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
034import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
035import org.apache.hadoop.hbase.testclassification.ReplicationTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.junit.Before;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
043
044@Category({ ReplicationTests.class, SmallTests.class })
045public class TestHBaseReplicationEndpoint {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class);
050
051  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
052
053  private HBaseReplicationEndpoint endpoint;
054
055  @Before
056  public void setUp() throws Exception {
057    ReplicationPeer replicationPeer = mock(ReplicationPeer.class);
058    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
059    when(replicationPeer.getPeerConfig()).thenReturn(peerConfig);
060    when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase");
061    ReplicationEndpoint.Context context =
062      new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
063        null, null, replicationPeer, null, null, null);
064    endpoint = new DummyHBaseReplicationEndpoint();
065    endpoint.init(context);
066  }
067
068  @Test
069  public void testChooseSinks() {
070    List<ServerName> serverNames = Lists.newArrayList();
071    int totalServers = 20;
072    for (int i = 0; i < totalServers; i++) {
073      serverNames.add(mock(ServerName.class));
074    }
075    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
076    endpoint.chooseSinks();
077    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
078    assertEquals(expected, endpoint.getNumSinks());
079  }
080
081  @Test
082  public void testChooseSinksLessThanRatioAvailable() {
083    List<ServerName> serverNames =
084      Lists.newArrayList(mock(ServerName.class), mock(ServerName.class));
085    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
086    endpoint.chooseSinks();
087    assertEquals(1, endpoint.getNumSinks());
088  }
089
090  @Test
091  public void testReportBadSink() {
092    ServerName serverNameA = mock(ServerName.class);
093    ServerName serverNameB = mock(ServerName.class);
094    ((DummyHBaseReplicationEndpoint) endpoint)
095      .setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
096    endpoint.chooseSinks();
097    // Sanity check
098    assertEquals(1, endpoint.getNumSinks());
099
100    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
101    endpoint.reportBadSink(sinkPeer);
102    // Just reporting a bad sink once shouldn't have an effect
103    assertEquals(1, endpoint.getNumSinks());
104  }
105
106  /**
107   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be
108   * replicated to anymore.
109   */
110  @Test
111  public void testReportBadSinkPastThreshold() {
112    List<ServerName> serverNames = Lists.newArrayList();
113    int totalServers = 30;
114    for (int i = 0; i < totalServers; i++) {
115      serverNames.add(mock(ServerName.class));
116    }
117    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
118    endpoint.chooseSinks();
119    // Sanity check
120    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
121    assertEquals(expected, endpoint.getNumSinks());
122
123    ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
124    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
125    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
126      endpoint.reportBadSink(sinkPeer);
127    }
128    // Reporting a bad sink more than the threshold count should remove it
129    // from the list of potential sinks
130    assertEquals(expected - 1, endpoint.getNumSinks());
131
132    // now try a sink that has some successes
133    ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
134    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
135    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
136      endpoint.reportBadSink(sinkPeer);
137    }
138    endpoint.reportSinkSuccess(sinkPeer); // one success
139    endpoint.reportBadSink(sinkPeer);
140    // did not remove the sink, since we had one successful try
141    assertEquals(expected - 1, endpoint.getNumSinks());
142
143    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
144      endpoint.reportBadSink(sinkPeer);
145    }
146    // still not remove, since the success reset the counter
147    assertEquals(expected - 1, endpoint.getNumSinks());
148    endpoint.reportBadSink(sinkPeer);
149    // but we exhausted the tries
150    assertEquals(expected - 2, endpoint.getNumSinks());
151  }
152
153  @Test
154  public void testReportBadSinkDownToZeroSinks() {
155    List<ServerName> serverNames = Lists.newArrayList();
156    int totalServers = 4;
157    for (int i = 0; i < totalServers; i++) {
158      serverNames.add(mock(ServerName.class));
159    }
160    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
161    endpoint.chooseSinks();
162    // Sanity check
163    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
164    assertEquals(expected, endpoint.getNumSinks());
165
166    ServerName serverNameA = endpoint.getSinkServers().get(0);
167    ServerName serverNameB = endpoint.getSinkServers().get(1);
168
169    serverNames.remove(serverNameA);
170    serverNames.remove(serverNameB);
171
172    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
173    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
174
175    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
176      endpoint.reportBadSink(sinkPeerA);
177      endpoint.reportBadSink(sinkPeerB);
178    }
179
180    // We've gone down to 0 good sinks, so the replication sinks
181    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
182    // reported as bad.
183    expected =
184      (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
185    assertEquals(expected, endpoint.getNumSinks());
186  }
187
188  private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
189
190    List<ServerName> regionServers;
191
192    public void setRegionServers(List<ServerName> regionServers) {
193      this.regionServers = regionServers;
194    }
195
196    @Override
197    protected Collection<ServerName> fetchPeerAddresses() {
198      return regionServers;
199    }
200
201    @Override
202    public boolean replicate(ReplicateContext replicateContext) {
203      return false;
204    }
205
206    @Override
207    public AsyncClusterConnection createConnection(URI clusterURI, Configuration conf)
208      throws IOException {
209      return null;
210    }
211  }
212}