001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.Server; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.conf.ConfigurationManager; 036import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 039import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; 040import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; 041import org.apache.hadoop.hbase.replication.ReplicationFactory; 042import org.apache.hadoop.hbase.replication.ReplicationPeers; 043import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 044import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 045import org.apache.hadoop.hbase.replication.ReplicationUtils; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.wal.WALFactory; 048import org.apache.hadoop.hbase.wal.WALProvider; 049import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.apache.zookeeper.KeeperException; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 058 059/** 060 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. 061 * <p> 062 * Implement {@link PropagatingConfigurationObserver} mainly for registering 063 * {@link ReplicationPeers}, so we can recreating the replication peer storage. 064 */ 065@InterfaceAudience.Private 066public class Replication 067 implements ReplicationSourceService, ReplicationSinkService, PropagatingConfigurationObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(Replication.class); 069 private boolean isReplicationForBulkLoadDataEnabled; 070 private ReplicationSourceManager replicationManager; 071 private ReplicationQueueStorage queueStorage; 072 private ReplicationPeers replicationPeers; 073 private volatile Configuration conf; 074 private ReplicationSink replicationSink; 075 // Hosting server 076 private Server server; 077 /** Statistics thread schedule pool */ 078 private ScheduledExecutorService scheduleThreadPool; 079 private int statsThreadPeriod; 080 // ReplicationLoad to access replication metrics 081 private ReplicationLoad replicationLoad; 082 private MetricsReplicationGlobalSourceSource globalMetricsSource; 083 084 private PeerProcedureHandler peerProcedureHandler; 085 086 /** 087 * Empty constructor 088 */ 089 public Replication() { 090 } 091 092 @Override 093 public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, 094 WALFactory walFactory) throws IOException { 095 this.server = server; 096 this.conf = this.server.getConfiguration(); 097 this.isReplicationForBulkLoadDataEnabled = 098 ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); 099 this.scheduleThreadPool = Executors.newScheduledThreadPool(1, 100 new ThreadFactoryBuilder() 101 .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") 102 .setDaemon(true).build()); 103 if (this.isReplicationForBulkLoadDataEnabled) { 104 if ( 105 conf.get(HConstants.REPLICATION_CLUSTER_ID) == null 106 || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty() 107 ) { 108 throw new IllegalArgumentException( 109 HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when " 110 + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true."); 111 } 112 } 113 114 try { 115 this.queueStorage = 116 ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 117 this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(), 118 server.getZooKeeper(), this.conf); 119 this.replicationPeers.init(); 120 } catch (Exception e) { 121 throw new IOException("Failed replication handler create", e); 122 } 123 UUID clusterId = null; 124 try { 125 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); 126 } catch (KeeperException ke) { 127 throw new IOException("Could not read cluster id", ke); 128 } 129 this.globalMetricsSource = CompatibilitySingletonFactory 130 .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 131 this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf, 132 this.server, fs, logDir, oldLogDir, clusterId, walFactory, globalMetricsSource); 133 // Get the user-space WAL provider 134 WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null; 135 if (walProvider != null) { 136 walProvider 137 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 138 } 139 this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); 140 LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); 141 this.replicationLoad = new ReplicationLoad(); 142 143 this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); 144 } 145 146 @Override 147 public PeerProcedureHandler getPeerProcedureHandler() { 148 return peerProcedureHandler; 149 } 150 151 /** 152 * Stops replication service. 153 */ 154 @Override 155 public void stopReplicationService() { 156 join(); 157 } 158 159 /** 160 * Join with the replication threads 161 */ 162 public void join() { 163 this.replicationManager.join(); 164 if (this.replicationSink != null) { 165 this.replicationSink.stopReplicationSinkServices(); 166 } 167 scheduleThreadPool.shutdown(); 168 } 169 170 /** 171 * Carry on the list of log entries down to the sink 172 * @param entries list of entries to replicate 173 * @param cells The data -- the cells -- that <code>entries</code> describes 174 * (the entries do not contain the Cells we are replicating; 175 * they are passed here on the side in this CellScanner). 176 * @param replicationClusterId Id which will uniquely identify source cluster FS client 177 * configurations in the replication configuration directory 178 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 179 * directory required for replicating hfiles 180 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 181 */ 182 @Override 183 public void replicateLogEntries(List<WALEntry> entries, CellScanner cells, 184 String replicationClusterId, String sourceBaseNamespaceDirPath, 185 String sourceHFileArchiveDirPath) throws IOException { 186 this.replicationSink.replicateEntries(entries, cells, replicationClusterId, 187 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); 188 } 189 190 /** 191 * If replication is enabled and this cluster is a master, it starts 192 */ 193 @Override 194 public void startReplicationService() throws IOException { 195 this.replicationManager.init(); 196 RegionServerCoprocessorHost rsServerHost = null; 197 if (server instanceof HRegionServer) { 198 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 199 } 200 this.replicationSink = new ReplicationSink(this.conf, rsServerHost); 201 this.scheduleThreadPool.scheduleAtFixedRate( 202 new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), 203 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); 204 LOG.info("{} started", this.server.toString()); 205 } 206 207 /** 208 * Get the replication sources manager 209 * @return the manager if replication is enabled, else returns false 210 */ 211 public ReplicationSourceManager getReplicationManager() { 212 return this.replicationManager; 213 } 214 215 void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 216 throws IOException { 217 try { 218 this.replicationManager.addHFileRefs(tableName, family, pairs); 219 } catch (IOException e) { 220 LOG.error("Failed to add hfile references in the replication queue.", e); 221 throw e; 222 } 223 } 224 225 /** 226 * Statistics task. Periodically prints the cache statistics to the log. 227 */ 228 private final static class ReplicationStatisticsTask implements Runnable { 229 230 private final ReplicationSink replicationSink; 231 private final ReplicationSourceManager replicationManager; 232 233 public ReplicationStatisticsTask(ReplicationSink replicationSink, 234 ReplicationSourceManager replicationManager) { 235 this.replicationManager = replicationManager; 236 this.replicationSink = replicationSink; 237 } 238 239 @Override 240 public void run() { 241 printStats(this.replicationManager.getStats()); 242 printStats(this.replicationSink.getStats()); 243 } 244 245 private void printStats(String stats) { 246 if (!stats.isEmpty()) { 247 LOG.info(stats); 248 } 249 } 250 } 251 252 @Override 253 public ReplicationLoad refreshAndGetReplicationLoad() { 254 if (this.replicationLoad == null) { 255 return null; 256 } 257 // always build for latest data 258 buildReplicationLoad(); 259 return this.replicationLoad; 260 } 261 262 private void buildReplicationLoad() { 263 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 264 allSources.addAll(this.replicationManager.getSources()); 265 allSources.addAll(this.replicationManager.getOldSources()); 266 267 // get sink 268 MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); 269 this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); 270 } 271 272 @Override 273 public void onConfigurationChange(Configuration conf) { 274 this.conf = conf; 275 } 276 277 @Override 278 public void registerChildren(ConfigurationManager manager) { 279 manager.registerObserver(replicationPeers); 280 } 281 282 @Override 283 public void deregisterChildren(ConfigurationManager manager) { 284 manager.deregisterObserver(replicationPeers); 285 } 286}