001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.ChoreService; 037import org.apache.hadoop.hbase.CoordinatedStateManager; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.client.AsyncClusterConnection; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 047import org.apache.hadoop.hbase.replication.ReplicationQueueId; 048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.JsonMapper; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.hadoop.hbase.wal.WALFactory; 056import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 057import org.apache.hadoop.util.Tool; 058import org.apache.hadoop.util.ToolRunner; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.apache.zookeeper.KeeperException; 061 062/** 063 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this 064 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool 065 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase 066 * crashes 067 * 068 * <pre> 069 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp 070 * </pre> 071 */ 072@InterfaceAudience.Private 073public class ReplicationSyncUp extends Configured implements Tool { 074 075 public static class ReplicationSyncUpToolInfo { 076 077 private long startTimeMs; 078 079 public ReplicationSyncUpToolInfo() { 080 } 081 082 public ReplicationSyncUpToolInfo(long startTimeMs) { 083 this.startTimeMs = startTimeMs; 084 } 085 086 public long getStartTimeMs() { 087 return startTimeMs; 088 } 089 090 public void setStartTimeMs(long startTimeMs) { 091 this.startTimeMs = startTimeMs; 092 } 093 } 094 095 // For storing the information used to skip replicating some wals after the cluster is back online 096 public static final String INFO_DIR = "ReplicationSyncUp"; 097 098 public static final String INFO_FILE = "info"; 099 100 private static final long SLEEP_TIME = 10000; 101 102 /** 103 * Main program 104 */ 105 public static void main(String[] args) throws Exception { 106 int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args); 107 System.exit(ret); 108 } 109 110 // Find region servers under wal directory 111 // Here we only care about the region servers which may still be alive, as we need to add 112 // replications for them if missing. The dead region servers which have already been processed 113 // fully do not need to add their replication queues again, as the operation has already been done 114 // in SCP. 115 private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException { 116 FileStatus[] statuses; 117 try { 118 statuses = walFs.listStatus(walDir); 119 } catch (FileNotFoundException e) { 120 System.out.println("WAL directory " + walDir + " does not exists, ignore"); 121 return Collections.emptySet(); 122 } 123 Set<ServerName> regionServers = new HashSet<>(); 124 for (FileStatus status : statuses) { 125 // All wal files under the walDir is within its region server's directory 126 if (!status.isDirectory()) { 127 continue; 128 } 129 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath()); 130 if (sn != null) { 131 regionServers.add(sn); 132 } 133 } 134 return regionServers; 135 } 136 137 private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer, 138 Set<String> peerIds) throws ReplicationException { 139 Set<String> existingQueuePeerIds = new HashSet<>(); 140 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer); 141 for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) { 142 ReplicationQueueId queueId = iter.next(); 143 if (!queueId.isRecovered()) { 144 existingQueuePeerIds.add(queueId.getPeerId()); 145 } 146 } 147 148 for (String peerId : peerIds) { 149 if (!existingQueuePeerIds.contains(peerId)) { 150 ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId); 151 System.out.println("Add replication queue " + queueId + " for claiming"); 152 storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN, 153 Collections.emptyMap()); 154 } 155 } 156 } 157 158 private void addMissingReplicationQueues(ReplicationQueueStorage storage, 159 Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException { 160 for (ServerName regionServer : regionServers) { 161 addMissingReplicationQueues(storage, regionServer, peerIds); 162 } 163 } 164 165 // When using this tool, usually the source cluster is unhealthy, so we should try to claim the 166 // replication queues for the dead region servers first and then replicate the data out. 167 private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers) 168 throws ReplicationException, KeeperException, IOException { 169 // union the region servers from both places, i.e, from the wal directory, and the records in 170 // replication queue storage. 171 Set<ServerName> replicators = new HashSet<>(regionServers); 172 ReplicationQueueStorage queueStorage = mgr.getQueueStorage(); 173 replicators.addAll(queueStorage.listAllReplicators()); 174 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 175 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); 176 for (ServerName sn : replicators) { 177 List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn); 178 System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); 179 // record the rs name, so when master restarting, we will skip claiming its replication queue 180 fs.createNewFile(new Path(infoDir, sn.getServerName())); 181 for (ReplicationQueueId queueId : replicationQueues) { 182 mgr.claimQueue(queueId, true); 183 } 184 } 185 } 186 187 private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException { 188 // Record the info of this run. Currently only record the time we run the job. We will use this 189 // timestamp to clean up the data for last sequence ids and hfile refs in replication queue 190 // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. 191 ReplicationSyncUpToolInfo info = 192 new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); 193 String json = JsonMapper.writeObjectAsString(info); 194 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); 195 try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) { 196 out.write(Bytes.toBytes(json)); 197 } 198 } 199 200 private static boolean parseOpts(String args[]) { 201 LinkedList<String> argv = new LinkedList<>(); 202 argv.addAll(Arrays.asList(args)); 203 String cmd = null; 204 while ((cmd = argv.poll()) != null) { 205 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 206 printUsageAndExit(null, 0); 207 } 208 if (cmd.equals("-f")) { 209 return true; 210 } 211 if (!argv.isEmpty()) { 212 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 213 } 214 } 215 return false; 216 } 217 218 private static void printUsageAndExit(final String message, final int exitCode) { 219 printUsage(message); 220 System.exit(exitCode); 221 } 222 223 private static void printUsage(final String message) { 224 if (message != null && message.length() > 0) { 225 System.err.println(message); 226 } 227 System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\"); 228 System.err.println(" <OPTIONS> [-D<property=value>]*"); 229 System.err.println(); 230 System.err.println("General Options:"); 231 System.err.println(" -h|--h|--help Show this help and exit."); 232 System.err 233 .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. " 234 + "See HBASE-27623 for details."); 235 } 236 237 @Override 238 public int run(String[] args) throws Exception { 239 Abortable abortable = new Abortable() { 240 241 private volatile boolean abort = false; 242 243 @Override 244 public void abort(String why, Throwable e) { 245 if (isAborted()) { 246 return; 247 } 248 abort = true; 249 System.err.println("Aborting because of " + why); 250 e.printStackTrace(); 251 System.exit(1); 252 } 253 254 @Override 255 public boolean isAborted() { 256 return abort; 257 } 258 }; 259 boolean isForce = parseOpts(args); 260 Configuration conf = getConf(); 261 try (ZKWatcher zkw = new ZKWatcher(conf, 262 "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { 263 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 264 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 265 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 266 Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 267 268 System.out.println("Start Replication Server"); 269 writeInfoFile(fs, isForce); 270 Replication replication = new Replication(); 271 // use offline table replication queue storage 272 getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, 273 OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class); 274 DummyServer server = new DummyServer(getConf(), zkw); 275 replication 276 .initialize(server, fs, new Path(logDir, server.toString()), oldLogDir, 277 new WALFactory(conf, 278 ServerName.valueOf( 279 getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), 280 null)); 281 ReplicationSourceManager manager = replication.getReplicationManager(); 282 manager.init(); 283 Set<ServerName> regionServers = listRegionServers(fs, logDir); 284 addMissingReplicationQueues(manager.getQueueStorage(), regionServers, 285 manager.getReplicationPeers().getAllPeerIds()); 286 claimReplicationQueues(manager, regionServers); 287 while (manager.activeFailoverTaskCount() > 0) { 288 Thread.sleep(SLEEP_TIME); 289 } 290 while (manager.getOldSources().size() > 0) { 291 Thread.sleep(SLEEP_TIME); 292 } 293 manager.join(); 294 } catch (InterruptedException e) { 295 System.err.println("didn't wait long enough:" + e); 296 return -1; 297 } 298 return 0; 299 } 300 301 private static final class DummyServer implements Server { 302 private final Configuration conf; 303 private final String hostname; 304 private final ZKWatcher zkw; 305 private volatile boolean abort = false; 306 307 DummyServer(Configuration conf, ZKWatcher zkw) { 308 // a unique name in case the first run fails 309 hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; 310 this.conf = conf; 311 this.zkw = zkw; 312 } 313 314 @Override 315 public Configuration getConfiguration() { 316 return conf; 317 } 318 319 @Override 320 public ZKWatcher getZooKeeper() { 321 return zkw; 322 } 323 324 @Override 325 public CoordinatedStateManager getCoordinatedStateManager() { 326 return null; 327 } 328 329 @Override 330 public ServerName getServerName() { 331 return ServerName.valueOf(hostname, 1234, 1L); 332 } 333 334 @Override 335 public void abort(String why, Throwable e) { 336 if (isAborted()) { 337 return; 338 } 339 abort = true; 340 System.err.println("Aborting because of " + why); 341 e.printStackTrace(); 342 System.exit(1); 343 } 344 345 @Override 346 public boolean isAborted() { 347 return abort; 348 } 349 350 @Override 351 public void stop(String why) { 352 } 353 354 @Override 355 public boolean isStopped() { 356 return false; 357 } 358 359 @Override 360 public Connection getConnection() { 361 return null; 362 } 363 364 @Override 365 public ChoreService getChoreService() { 366 return null; 367 } 368 369 @Override 370 public FileSystem getFileSystem() { 371 return null; 372 } 373 374 @Override 375 public boolean isStopping() { 376 return false; 377 } 378 379 @Override 380 public Connection createConnection(Configuration conf) throws IOException { 381 return null; 382 } 383 384 @Override 385 public AsyncClusterConnection getAsyncClusterConnection() { 386 return null; 387 } 388 } 389}