001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.replication.ReplicationException; 031import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 032import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 033import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 034import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 035import org.apache.hadoop.hbase.util.HbckErrorReporter; 036import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Check and fix undeleted replication queues for removed peerId. 043 */ 044@InterfaceAudience.Private 045public class ReplicationChecker { 046 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class); 048 049 private final HbckErrorReporter errorReporter; 050 // replicator with its queueIds for removed peers 051 private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>(); 052 // replicator with its undeleted queueIds for removed peers in hfile-refs queue 053 private Set<String> undeletedHFileRefsPeerIds = new HashSet<>(); 054 055 private final ReplicationPeerStorage peerStorage; 056 private final ReplicationQueueStorage queueStorage; 057 058 public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter) 059 throws IOException { 060 this.peerStorage = 061 ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw, conf); 062 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); 063 this.errorReporter = errorReporter; 064 } 065 066 public boolean hasUnDeletedQueues() { 067 return errorReporter.getErrorList() 068 .contains(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); 069 } 070 071 private Map<ServerName, List<String>> getUnDeletedQueues() throws ReplicationException { 072 Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); 073 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 074 for (ServerName replicator : queueStorage.getListOfReplicators()) { 075 for (String queueId : queueStorage.getAllQueues(replicator)) { 076 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 077 if (!peerIds.contains(queueInfo.getPeerId())) { 078 undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); 079 LOG.debug( 080 "Undeleted replication queue for removed peer found: " 081 + "[removedPeerId={}, replicator={}, queueId={}]", 082 queueInfo.getPeerId(), replicator, queueId); 083 } 084 } 085 } 086 return undeletedQueues; 087 } 088 089 private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException { 090 Set<String> undeletedHFileRefsPeerIds = 091 new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue()); 092 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 093 undeletedHFileRefsPeerIds.removeAll(peerIds); 094 if (LOG.isDebugEnabled()) { 095 for (String peerId : undeletedHFileRefsPeerIds) { 096 LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId); 097 } 098 } 099 return undeletedHFileRefsPeerIds; 100 } 101 102 public void checkUnDeletedQueues() throws ReplicationException { 103 undeletedQueueIds = getUnDeletedQueues(); 104 undeletedQueueIds.forEach((replicator, queueIds) -> { 105 queueIds.forEach(queueId -> { 106 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 107 String msg = "Undeleted replication queue for removed peer found: " 108 + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), 109 replicator, queueId); 110 errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); 111 }); 112 }); 113 undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); 114 undeletedHFileRefsPeerIds.stream() 115 .map(peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") 116 .forEach(msg -> errorReporter 117 .reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); 118 } 119 120 public void fixUnDeletedQueues() throws ReplicationException { 121 for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { 122 ServerName replicator = replicatorAndQueueIds.getKey(); 123 for (String queueId : replicatorAndQueueIds.getValue()) { 124 queueStorage.removeQueue(replicator, queueId); 125 } 126 queueStorage.removeReplicatorIfQueueIsEmpty(replicator); 127 } 128 for (String peerId : undeletedHFileRefsPeerIds) { 129 queueStorage.removePeerFromHFileRefs(peerId); 130 } 131 } 132}