001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.Random; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.ClusterConnection; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 036import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 039 040/** 041 * Maintains a collection of peers to replicate to, and randomly selects a single peer to replicate 042 * to per set of data to replicate. Also handles keeping track of peer availability. 043 */ 044@InterfaceAudience.Private 045public class ReplicationSinkManager { 046 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class); 048 049 /** 050 * Default maximum number of times a replication sink can be reported as bad before it will no 051 * longer be provided as a sink for replication without the pool of replication sinks being 052 * refreshed. 053 */ 054 static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 055 056 /** 057 * Default ratio of the total number of peer cluster region servers to consider replicating to. 058 */ 059 static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 060 061 private final Connection conn; 062 063 private final String peerClusterId; 064 065 private final HBaseReplicationEndpoint endpoint; 066 067 // Count of "bad replication sink" reports per peer sink 068 private final Map<ServerName, Integer> badReportCounts; 069 070 // Ratio of total number of potential peer region servers to be used 071 private final float ratio; 072 073 // Maximum number of times a sink can be reported as bad before the pool of 074 // replication sinks is refreshed 075 private final int badSinkThreshold; 076 077 private final Random random; 078 079 // A timestamp of the last time the list of replication peers changed 080 private long lastUpdateToPeers; 081 082 // The current pool of sinks to which replication can be performed 083 private List<ServerName> sinks = Lists.newArrayList(); 084 085 /** 086 * Instantiate for a single replication peer cluster. 087 * @param conn connection to the peer cluster 088 * @param peerClusterId identifier of the peer cluster 089 * @param endpoint replication endpoint for inter cluster replication 090 * @param conf HBase configuration, used for determining replication source ratio and bad 091 * peer threshold 092 */ 093 public ReplicationSinkManager(ClusterConnection conn, String peerClusterId, 094 HBaseReplicationEndpoint endpoint, Configuration conf) { 095 this.conn = conn; 096 this.peerClusterId = peerClusterId; 097 this.endpoint = endpoint; 098 this.badReportCounts = Maps.newHashMap(); 099 this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 100 this.badSinkThreshold = 101 conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); 102 this.random = new Random(); 103 } 104 105 /** 106 * Get a randomly-chosen replication sink to replicate to. 107 * @return a replication sink to replicate to 108 */ 109 public synchronized SinkPeer getReplicationSink() throws IOException { 110 if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { 111 LOG.info("Current list of sinks is out of date or empty, updating"); 112 chooseSinks(); 113 } 114 115 if (sinks.isEmpty()) { 116 throw new IOException("No replication sinks are available"); 117 } 118 ServerName serverName = sinks.get(random.nextInt(sinks.size())); 119 return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName)); 120 } 121 122 /** 123 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single 124 * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed 125 * from the pool of potential replication targets. The SinkPeer that had a failed replication 126 * attempt on it 127 */ 128 public synchronized void reportBadSink(SinkPeer sinkPeer) { 129 ServerName serverName = sinkPeer.getServerName(); 130 int badReportCount = 131 (badReportCounts.containsKey(serverName) ? badReportCounts.get(serverName) : 0) + 1; 132 badReportCounts.put(serverName, badReportCount); 133 if (badReportCount > badSinkThreshold) { 134 this.sinks.remove(serverName); 135 if (sinks.isEmpty()) { 136 chooseSinks(); 137 } 138 } 139 } 140 141 /** 142 * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a 143 * failed replication attempt on it 144 */ 145 public synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 146 badReportCounts.remove(sinkPeer.getServerName()); 147 } 148 149 /** 150 * Refresh the list of sinks. 151 */ 152 public synchronized void chooseSinks() { 153 List<ServerName> slaveAddresses = endpoint.getRegionServers(); 154 if (slaveAddresses.isEmpty()) { 155 LOG.warn("No sinks available at peer. Will not be able to replicate"); 156 } 157 Collections.shuffle(slaveAddresses, random); 158 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 159 sinks = slaveAddresses.subList(0, numSinks); 160 lastUpdateToPeers = EnvironmentEdgeManager.currentTime(); 161 badReportCounts.clear(); 162 } 163 164 public synchronized int getNumSinks() { 165 return sinks.size(); 166 } 167 168 protected List<ServerName> getSinksForTesting() { 169 return Collections.unmodifiableList(sinks); 170 } 171 172 /** 173 * Wraps a replication region server sink to provide the ability to identify it. 174 */ 175 public static class SinkPeer { 176 private ServerName serverName; 177 private AdminService.BlockingInterface regionServer; 178 179 public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { 180 this.serverName = serverName; 181 this.regionServer = regionServer; 182 } 183 184 ServerName getServerName() { 185 return serverName; 186 } 187 188 public AdminService.BlockingInterface getRegionServer() { 189 return regionServer; 190 } 191 192 } 193 194}