001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Map;
028import java.util.UUID;
029import java.util.concurrent.ThreadLocalRandom;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.AsyncClusterConnection;
036import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
037import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
038import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.util.FutureUtils;
041import org.apache.hadoop.hbase.util.ReservoirSample;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
047
048/**
049 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase
050 * cluster.
051 */
052@InterfaceAudience.Private
053public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
054  implements Abortable {
055
056  private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
057
058  protected Configuration conf;
059
060  private URI clusterURI;
061
062  private final Object connLock = new Object();
063
064  private volatile AsyncClusterConnection conn;
065
066  /**
067   * Default maximum number of times a replication sink can be reported as bad before it will no
068   * longer be provided as a sink for replication without the pool of replication sinks being
069   * refreshed.
070   */
071  public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
072
073  /**
074   * Default ratio of the total number of peer cluster region servers to consider replicating to.
075   */
076  public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
077
078  // Ratio of total number of potential peer region servers to be used
079  private float ratio;
080
081  // Maximum number of times a sink can be reported as bad before the pool of
082  // replication sinks is refreshed
083  private int badSinkThreshold;
084  // Count of "bad replication sink" reports per peer sink
085  private Map<ServerName, Integer> badReportCounts;
086
087  private List<ServerName> sinkServers = new ArrayList<>(0);
088
089  /**
090   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
091   * Connection implementations, or initialize it in a different way, so defining createConnection
092   * as protected for possible overridings.
093   */
094  protected AsyncClusterConnection createConnection(URI clusterURI, Configuration conf)
095    throws IOException {
096    return ClusterConnectionFactory.createAsyncClusterConnection(clusterURI, conf, null,
097      User.getCurrent());
098  }
099
100  @Override
101  public void init(Context context) throws IOException {
102    super.init(context);
103    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
104    this.clusterURI = ConnectionRegistryFactory
105      .tryParseAsConnectionURI(context.getReplicationPeer().getPeerConfig().getClusterKey());
106    this.ratio =
107      ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
108    this.badSinkThreshold =
109      ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
110    this.badReportCounts = Maps.newHashMap();
111  }
112
113  private void disconnect() {
114    synchronized (connLock) {
115      if (this.conn != null) {
116        try {
117          this.conn.close();
118          this.conn = null;
119        } catch (IOException e) {
120          LOG.warn("{} Failed to close the connection", ctx.getPeerId());
121        }
122      }
123    }
124  }
125
126  @Override
127  public void start() {
128    startAsync();
129  }
130
131  @Override
132  public void stop() {
133    stopAsync();
134  }
135
136  @Override
137  protected void doStart() {
138    notifyStarted();
139  }
140
141  @Override
142  protected void doStop() {
143    disconnect();
144    notifyStopped();
145  }
146
147  @Override
148  public UUID getPeerUUID() {
149    try {
150      AsyncClusterConnection conn = connect();
151      String clusterId = FutureUtils
152        .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
153        .getClusterId();
154      return UUID.fromString(clusterId);
155    } catch (IOException e) {
156      LOG.warn("Failed to get cluster id for cluster", e);
157      return null;
158    }
159  }
160
161  // do not call this method in doStart method, only initialize the connection to remote cluster
162  // when you actually wants to make use of it. The problem here is that, starting the replication
163  // endpoint is part of the region server initialization work, so if the peer cluster is fully
164  // down and we can not connect to it, we will cause the initialization to fail and crash the
165  // region server, as we need the cluster id while setting up the AsyncClusterConnection, which
166  // needs to at least connect to zookeeper or some other servers in the peer cluster based on
167  // different connection registry implementation
168  private AsyncClusterConnection connect() throws IOException {
169    AsyncClusterConnection c = this.conn;
170    if (c != null) {
171      return c;
172    }
173    synchronized (connLock) {
174      c = this.conn;
175      if (c != null) {
176        return c;
177      }
178      c = createConnection(clusterURI, conf);
179      conn = c;
180    }
181    return c;
182  }
183
184  @Override
185  public void abort(String why, Throwable e) {
186    LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
187      + " was aborted for the following reason(s):" + why, e);
188  }
189
190  @Override
191  public boolean isAborted() {
192    // Currently this is never "Aborted", we just log when the abort method is called.
193    return false;
194  }
195
196  /**
197   * Get the list of all the region servers from the specified peer
198   * @return list of region server addresses or an empty list if the slave is unavailable
199   */
200  // will be overrided in tests so protected
201  protected Collection<ServerName> fetchPeerAddresses() {
202    try {
203      return FutureUtils.get(connect().getAdmin().getRegionServers(true));
204    } catch (IOException e) {
205      LOG.debug("Fetch peer addresses failed", e);
206      return Collections.emptyList();
207    }
208  }
209
210  protected synchronized void chooseSinks() {
211    Collection<ServerName> slaveAddresses = fetchPeerAddresses();
212    if (slaveAddresses.isEmpty()) {
213      LOG.warn("No sinks available at peer. Will not be able to replicate");
214      this.sinkServers = Collections.emptyList();
215    } else {
216      int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
217      ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
218      sample.add(slaveAddresses.iterator());
219      this.sinkServers = sample.getSamplingResult();
220    }
221    badReportCounts.clear();
222  }
223
224  protected synchronized int getNumSinks() {
225    return sinkServers.size();
226  }
227
228  /**
229   * Get a randomly-chosen replication sink to replicate to.
230   * @return a replication sink to replicate to
231   */
232  protected synchronized SinkPeer getReplicationSink() throws IOException {
233    if (sinkServers.isEmpty()) {
234      LOG.info("Current list of sinks is out of date or empty, updating");
235      chooseSinks();
236    }
237    if (sinkServers.isEmpty()) {
238      throw new IOException("No replication sinks are available");
239    }
240    ServerName serverName =
241      sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
242    return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
243  }
244
245  /**
246   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single
247   * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed
248   * from the pool of potential replication targets.
249   * @param sinkPeer The SinkPeer that had a failed replication attempt on it
250   */
251  protected synchronized void reportBadSink(SinkPeer sinkPeer) {
252    ServerName serverName = sinkPeer.getServerName();
253    int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
254    if (badReportCount > badSinkThreshold) {
255      this.sinkServers.remove(serverName);
256      if (sinkServers.isEmpty()) {
257        chooseSinks();
258      }
259    }
260  }
261
262  /**
263   * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a
264   * failed replication attempt on it
265   */
266  protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
267    badReportCounts.remove(sinkPeer.getServerName());
268  }
269
270  List<ServerName> getSinkServers() {
271    return sinkServers;
272  }
273
274  /**
275   * Wraps a replication region server sink to provide the ability to identify it.
276   */
277  public static class SinkPeer {
278    private ServerName serverName;
279    private AsyncRegionServerAdmin regionServer;
280
281    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
282      this.serverName = serverName;
283      this.regionServer = regionServer;
284    }
285
286    ServerName getServerName() {
287      return serverName;
288    }
289
290    public AsyncRegionServerAdmin getRegionServer() {
291      return regionServer;
292    }
293  }
294}