001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.net.URLEncoder; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.Comparator; 028import java.util.LinkedList; 029import java.util.List; 030import java.util.Map; 031import java.util.Queue; 032import java.util.Set; 033import java.util.stream.Collectors; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.replication.TableCFs; 045import org.apache.hadoop.hbase.io.WALLink; 046import org.apache.hadoop.hbase.procedure2.util.StringUtils; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 049import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 052import org.apache.hadoop.hbase.replication.ReplicationQueueData; 053import org.apache.hadoop.hbase.replication.ReplicationQueueId; 054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 057import org.apache.hadoop.util.Tool; 058import org.apache.hadoop.util.ToolRunner; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; 065 066/** 067 * <p/> 068 * Provides information about the existing states of replication, replication peers and queues. 069 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 070 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS 071 * usage by the replication queues (note: can be overestimated). In the new version, we 072 * reimplemented the DumpReplicationQueues tool to support obtaining information from replication 073 * table. 074 */ 075@InterfaceAudience.Private 076public class DumpReplicationQueues extends Configured implements Tool { 077 078 private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName()); 079 080 private List<String> deadRegionServers; 081 private List<String> deletedQueues; 082 private AtomicLongMap<String> peersQueueSize; 083 private long totalSizeOfWALs; 084 private long numWalsNotFound; 085 086 public DumpReplicationQueues() { 087 deadRegionServers = new ArrayList<>(); 088 deletedQueues = new ArrayList<>(); 089 peersQueueSize = AtomicLongMap.create(); 090 totalSizeOfWALs = 0; 091 numWalsNotFound = 0; 092 } 093 094 static class DumpOptions { 095 boolean hdfs = false; 096 boolean distributed = false; 097 098 public DumpOptions() { 099 } 100 101 public DumpOptions(DumpOptions that) { 102 this.hdfs = that.hdfs; 103 this.distributed = that.distributed; 104 } 105 106 boolean isHdfs() { 107 return hdfs; 108 } 109 110 boolean isDistributed() { 111 return distributed; 112 } 113 114 void setHdfs(boolean hdfs) { 115 this.hdfs = hdfs; 116 } 117 118 void setDistributed(boolean distributed) { 119 this.distributed = distributed; 120 } 121 } 122 123 static DumpOptions parseOpts(Queue<String> args) { 124 DumpOptions opts = new DumpOptions(); 125 126 String cmd = null; 127 while ((cmd = args.poll()) != null) { 128 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 129 // place item back onto queue so that caller knows parsing was incomplete 130 args.add(cmd); 131 break; 132 } 133 final String hdfs = "--hdfs"; 134 if (cmd.equals(hdfs)) { 135 opts.setHdfs(true); 136 continue; 137 } 138 final String distributed = "--distributed"; 139 if (cmd.equals(distributed)) { 140 opts.setDistributed(true); 141 continue; 142 } else { 143 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 144 } 145 // check that --distributed is present when --hdfs is in the arguments 146 if (!opts.isDistributed() && opts.isHdfs()) { 147 printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); 148 } 149 } 150 return opts; 151 } 152 153 /** 154 * Main 155 */ 156 public static void main(String[] args) throws Exception { 157 Configuration conf = HBaseConfiguration.create(); 158 int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); 159 System.exit(ret); 160 } 161 162 @Override 163 public int run(String[] args) throws Exception { 164 165 int errCode = -1; 166 LinkedList<String> argv = new LinkedList<>(); 167 argv.addAll(Arrays.asList(args)); 168 DumpOptions opts = parseOpts(argv); 169 170 // args remaining, print help and exit 171 if (!argv.isEmpty()) { 172 errCode = 0; 173 printUsage(); 174 return errCode; 175 } 176 return dumpReplicationQueues(opts); 177 } 178 179 protected void printUsage() { 180 printUsage(this.getClass().getName(), null); 181 } 182 183 protected static void printUsage(final String message) { 184 printUsage(DumpReplicationQueues.class.getName(), message); 185 } 186 187 protected static void printUsage(final String className, final String message) { 188 if (message != null && message.length() > 0) { 189 System.err.println(message); 190 } 191 System.err.println("Usage: hbase " + className + " \\"); 192 System.err.println(" <OPTIONS> [-D<property=value>]*"); 193 System.err.println(); 194 System.err.println("General Options:"); 195 System.err.println(" -h|--h|--help Show this help and exit."); 196 System.err.println(" --distributed Poll each RS and print its own replication queue. " 197 + "Default only polls replication table."); 198 System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." 199 + " It could be overestimated if replicating to multiple peers." 200 + " --distributed flag is also needed."); 201 } 202 203 protected static void printUsageAndExit(final String message, final int exitCode) { 204 printUsage(message); 205 System.exit(exitCode); 206 } 207 208 private int dumpReplicationQueues(DumpOptions opts) throws Exception { 209 Configuration conf = getConf(); 210 Connection connection = ConnectionFactory.createConnection(conf); 211 Admin admin = connection.getAdmin(); 212 213 try { 214 List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); 215 if (replicatedTableCFs.isEmpty()) { 216 LOG.info("No tables with a configured replication peer were found."); 217 return (0); 218 } else { 219 LOG.info("Replicated Tables: " + replicatedTableCFs); 220 } 221 222 List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); 223 224 if (peers.isEmpty()) { 225 LOG.info("Replication is enabled but no peer configuration was found."); 226 } 227 228 System.out.println("Dumping replication peers and configurations:"); 229 System.out.println(dumpPeersState(peers)); 230 231 if (opts.isDistributed()) { 232 LOG.info("Found [--distributed], will poll each RegionServer."); 233 Set<String> peerIds = 234 peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet()); 235 System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf)); 236 System.out.println(dumpReplicationSummary()); 237 } else { 238 // use replication table instead 239 System.out.println("Dumping replication info via replication table."); 240 System.out.println(dumpReplicationViaTable(connection, conf)); 241 } 242 return (0); 243 } catch (IOException e) { 244 return (-1); 245 } finally { 246 connection.close(); 247 } 248 } 249 250 public String dumpReplicationViaTable(Connection connection, Configuration conf) 251 throws ReplicationException, IOException { 252 StringBuilder sb = new StringBuilder(); 253 ReplicationQueueStorage queueStorage = 254 ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); 255 256 // The dump info format is as follows: 257 // peers: 258 // peers/1: zk1:2181:/hbase 259 // peers/1/peer-state: ENABLED 260 // rs: 261 // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123 262 // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321 263 // hfile-refs: 264 // hfile-refs/1/hfile1,hfile2 265 // hfile-refs/2/hfile3,hfile4 266 String peersKey = "peers"; 267 sb.append(peersKey).append(": ").append("\n"); 268 List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers(); 269 for (ReplicationPeerDescription repPeerDesc : repPeerDescs) { 270 sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ") 271 .append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n"); 272 sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ") 273 .append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n"); 274 } 275 276 List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues(); 277 String rsKey = "rs"; 278 sb.append(rsKey).append(": ").append("\n"); 279 for (ReplicationQueueData repQueueData : repQueueDataList) { 280 String peerId = repQueueData.getId().getPeerId(); 281 for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets() 282 .entrySet()) { 283 sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/") 284 .append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset()) 285 .append("\n"); 286 } 287 } 288 289 List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue(); 290 String hfileKey = "hfile-refs"; 291 sb.append(hfileKey).append(": ").append("\n"); 292 for (String peerId : peerIds) { 293 List<String> hfiles = queueStorage.getReplicableHFiles(peerId); 294 sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles)) 295 .append("\n"); 296 } 297 298 return sb.toString(); 299 } 300 301 public String dumpReplicationSummary() { 302 StringBuilder sb = new StringBuilder(); 303 if (!deletedQueues.isEmpty()) { 304 sb.append("Found " + deletedQueues.size() + " deleted queues" 305 + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); 306 for (String deletedQueue : deletedQueues) { 307 sb.append(" " + deletedQueue + "\n"); 308 } 309 } 310 if (!deadRegionServers.isEmpty()) { 311 sb.append("Found " + deadRegionServers.size() + " dead regionservers" 312 + ", restart one regionserver to transfer the queues of dead regionservers\n"); 313 for (String deadRs : deadRegionServers) { 314 sb.append(" " + deadRs + "\n"); 315 } 316 } 317 if (!peersQueueSize.isEmpty()) { 318 sb.append("Dumping all peers's number of WALs in replication queue\n"); 319 for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) { 320 sb.append( 321 " PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); 322 } 323 } 324 sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); 325 if (numWalsNotFound > 0) { 326 sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); 327 } 328 return sb.toString(); 329 } 330 331 public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { 332 Map<String, String> currentConf; 333 StringBuilder sb = new StringBuilder(); 334 for (ReplicationPeerDescription peer : peers) { 335 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 336 sb.append("Peer: " + peer.getPeerId() + "\n"); 337 sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); 338 sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); 339 sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); 340 currentConf = peerConfig.getConfiguration(); 341 // Only show when we have a custom configuration for the peer 342 if (currentConf.size() > 1) { 343 sb.append(" " + "Peer Configuration: " + currentConf + "\n"); 344 } 345 sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); 346 sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); 347 } 348 return sb.toString(); 349 } 350 351 public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs, 352 Configuration conf) throws Exception { 353 StringBuilder sb = new StringBuilder(); 354 ReplicationQueueStorage queueStorage = 355 ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); 356 357 Set<ServerName> liveRegionServers = 358 connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet(); 359 360 List<ServerName> regionServers = queueStorage.listAllReplicators(); 361 if (regionServers == null || regionServers.isEmpty()) { 362 return sb.toString(); 363 } 364 for (ServerName regionServer : regionServers) { 365 List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer); 366 367 if (!liveRegionServers.contains(regionServer)) { 368 deadRegionServers.add(regionServer.getServerName()); 369 } 370 for (ReplicationQueueId queueId : queueIds) { 371 List<String> tmpWals = new ArrayList<>(); 372 // wals 373 AbstractFSWALProvider 374 .getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream() 375 .map(Path::toString).forEach(tmpWals::add); 376 377 // old wals 378 AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(), 379 queueId.getServerWALsBelongTo(), URLEncoder 380 .encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name())) 381 .stream().map(Path::toString).forEach(tmpWals::add); 382 383 Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); 384 // filter out the wal files that should replicate 385 List<String> wals = new ArrayList<>(); 386 for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) { 387 ReplicationGroupOffset offset = entry.getValue(); 388 for (String wal : tmpWals) { 389 if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) { 390 wals.add(wal); 391 } 392 } 393 } 394 Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp)); 395 if (!peerIds.contains(queueId.getPeerId())) { 396 deletedQueues.add(regionServer + "/" + queueId); 397 sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs)); 398 } else { 399 sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs)); 400 } 401 } 402 } 403 return sb.toString(); 404 } 405 406 private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets, 407 List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs) 408 throws Exception { 409 StringBuilder sb = new StringBuilder(); 410 411 sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n"); 412 sb.append(" Queue id: " + queueId + "\n"); 413 sb.append(" PeerID: " + queueId.getPeerId() + "\n"); 414 sb.append(" Recovered: " + queueId.isRecovered() + "\n"); 415 // In new version, we only record the first dead RegionServer in queueId. 416 if (queueId.getSourceServerName().isPresent()) { 417 sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n"); 418 } else { 419 sb.append(" No dead RegionServer found in this queue." + "\n"); 420 } 421 sb.append(" Was deleted: " + isDeleted + "\n"); 422 sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); 423 peersQueueSize.addAndGet(queueId.getPeerId(), wals.size()); 424 425 for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) { 426 String walGroup = entry.getKey(); 427 ReplicationGroupOffset offset = entry.getValue(); 428 for (String wal : wals) { 429 long position = 0; 430 if (offset.getWal().equals(wal)) { 431 position = offset.getOffset(); 432 } 433 sb.append( 434 " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": "); 435 if (position == 0) { 436 sb.append("0 (not started or nothing to replicate)"); 437 } else if (position > 0) { 438 sb.append(position); 439 } 440 sb.append("\n"); 441 } 442 } 443 444 if (hdfs) { 445 FileSystem fs = FileSystem.get(getConf()); 446 sb.append(" Total size of WALs on HDFS for this queue: " 447 + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n"); 448 } 449 return sb.toString(); 450 } 451 452 /** 453 * return total size in bytes from a list of WALs 454 */ 455 private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) { 456 long size = 0; 457 FileStatus fileStatus; 458 459 for (String wal : wals) { 460 try { 461 fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); 462 } catch (IOException e) { 463 if (e instanceof FileNotFoundException) { 464 numWalsNotFound++; 465 LOG.warn("WAL " + wal + " couldn't be found, skipping", e); 466 } else { 467 LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); 468 } 469 continue; 470 } 471 size += fileStatus.getLen(); 472 } 473 474 totalSizeOfWALs += size; 475 return size; 476 } 477}