001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.favored; 019 020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; 021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; 022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; 023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; 024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.stream.Collectors; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.master.MasterServices; 038import org.apache.hadoop.hbase.master.RackManager; 039import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; 040import org.apache.hadoop.hdfs.DFSConfigKeys; 041import org.apache.hadoop.hdfs.HdfsConfiguration; 042import org.apache.hadoop.net.NetUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 049import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 050 051/** 052 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and META 053 * table. Its the centralized store for all favored nodes information. All reads and updates should 054 * be done through this class. There should only be one instance of {@link FavoredNodesManager} in 055 * Master. {@link FavoredNodesPlan} and favored node information from 056 * {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except for 057 * tools that only read or fortest cases). All other classes including Favored balancers and 058 * {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any 059 * read/write/deletes to favored nodes. 060 */ 061@InterfaceAudience.Private 062public class FavoredNodesManager { 063 private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class); 064 065 private final FavoredNodesPlan globalFavoredNodesAssignmentPlan; 066 private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap; 067 private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap; 068 private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap; 069 070 private final MasterServices masterServices; 071 private final RackManager rackManager; 072 073 /** 074 * Datanode port to be used for Favored Nodes. 075 */ 076 private int datanodeDataTransferPort; 077 078 public FavoredNodesManager(MasterServices masterServices) { 079 this.masterServices = masterServices; 080 this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); 081 this.primaryRSToRegionMap = new HashMap<>(); 082 this.secondaryRSToRegionMap = new HashMap<>(); 083 this.teritiaryRSToRegionMap = new HashMap<>(); 084 this.rackManager = new RackManager(masterServices.getConfiguration()); 085 } 086 087 public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) { 088 // Add snapshot to structures made on creation. Current structures may have picked 089 // up data between construction and the scan of meta needed before this method 090 // is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time" 091 this.globalFavoredNodesAssignmentPlan 092 .updateFavoredNodesMap(snapshot.getExistingAssignmentPlan()); 093 primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap()); 094 secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap()); 095 teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap()); 096 datanodeDataTransferPort = getDataNodePort(); 097 } 098 099 public int getDataNodePort() { 100 HdfsConfiguration.init(); 101 102 Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration()); 103 104 int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, 105 DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); 106 LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort); 107 return dnPort; 108 } 109 110 public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) { 111 return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo); 112 } 113 114 /* 115 * Favored nodes are not applicable for system tables. We will use this to check before we apply 116 * any favored nodes logic on a region. 117 */ 118 public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) { 119 return !regionInfo.getTable().isSystemTable(); 120 } 121 122 /** 123 * Filter and return regions for which favored nodes is not applicable. 124 * @return set of regions for which favored nodes is not applicable 125 */ 126 public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) { 127 return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet()); 128 } 129 130 /* 131 * This should only be used when sending FN information to the region servers. Instead of sending 132 * the region server port, we use the datanode port. This helps in centralizing the DN port logic 133 * in Master. The RS uses the port from the favored node list as hints. 134 */ 135 public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) { 136 if (getFavoredNodes(regionInfo) == null) { 137 return null; 138 } 139 140 List<ServerName> fnWithDNPort = Lists.newArrayList(); 141 for (ServerName sn : getFavoredNodes(regionInfo)) { 142 fnWithDNPort 143 .add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, NON_STARTCODE)); 144 } 145 return fnWithDNPort; 146 } 147 148 public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap) 149 throws IOException { 150 151 Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>(); 152 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) { 153 RegionInfo regionInfo = entry.getKey(); 154 List<ServerName> servers = entry.getValue(); 155 156 /* 157 * None of the following error conditions should happen. If it does, there is an issue with 158 * favored nodes generation or the regions its called on. 159 */ 160 if (servers.size() != Sets.newHashSet(servers).size()) { 161 throw new IOException("Duplicates found: " + servers); 162 } 163 164 if (!isFavoredNodeApplicable(regionInfo)) { 165 throw new IOException("Can't update FN for a un-applicable region: " 166 + regionInfo.getRegionNameAsString() + " with " + servers); 167 } 168 169 if (servers.size() != FAVORED_NODES_NUM) { 170 throw new IOException( 171 "At least " + FAVORED_NODES_NUM + " favored nodes should be present for region : " 172 + regionInfo.getEncodedName() + " current FN servers:" + servers); 173 } 174 175 List<ServerName> serversWithNoStartCodes = Lists.newArrayList(); 176 for (ServerName sn : servers) { 177 if (sn.getStartcode() == NON_STARTCODE) { 178 serversWithNoStartCodes.add(sn); 179 } else { 180 serversWithNoStartCodes 181 .add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE)); 182 } 183 } 184 regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes); 185 } 186 187 // Lets do a bulk update to meta since that reduces the RPC's 188 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, 189 masterServices.getConnection()); 190 deleteFavoredNodesForRegions(regionToFavoredNodes.keySet()); 191 192 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { 193 RegionInfo regionInfo = entry.getKey(); 194 List<ServerName> serversWithNoStartCodes = entry.getValue(); 195 globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes); 196 addToReplicaLoad(regionInfo, serversWithNoStartCodes); 197 } 198 } 199 200 private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) { 201 ServerName serverToUse = 202 ServerName.valueOf(servers.get(PRIMARY.ordinal()).getAddress().toString(), NON_STARTCODE); 203 List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse); 204 if (regionList == null) { 205 regionList = new ArrayList<>(); 206 } 207 regionList.add(hri); 208 primaryRSToRegionMap.put(serverToUse, regionList); 209 210 serverToUse = ServerName.valueOf(servers.get(SECONDARY.ordinal()).getAddress(), NON_STARTCODE); 211 regionList = secondaryRSToRegionMap.get(serverToUse); 212 if (regionList == null) { 213 regionList = new ArrayList<>(); 214 } 215 regionList.add(hri); 216 secondaryRSToRegionMap.put(serverToUse, regionList); 217 218 serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getAddress(), NON_STARTCODE); 219 regionList = teritiaryRSToRegionMap.get(serverToUse); 220 if (regionList == null) { 221 regionList = new ArrayList<>(); 222 } 223 regionList.add(hri); 224 teritiaryRSToRegionMap.put(serverToUse, regionList); 225 } 226 227 /* 228 * Get the replica count for the servers provided. For each server, replica count includes three 229 * counts for primary, secondary and tertiary. If a server is the primary favored node for 10 230 * regions, secondary for 5 and tertiary for 1, then the list would be [10, 5, 1]. If the server 231 * is newly added to the cluster is not a favored node for any region, the replica count would be 232 * [0, 0, 0]. 233 */ 234 public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) { 235 Map<ServerName, List<Integer>> result = Maps.newHashMap(); 236 for (ServerName sn : servers) { 237 ServerName serverWithNoStartCode = ServerName.valueOf(sn.getAddress(), NON_STARTCODE); 238 List<Integer> countList = Lists.newArrayList(); 239 if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 240 countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size()); 241 } else { 242 countList.add(0); 243 } 244 if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 245 countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size()); 246 } else { 247 countList.add(0); 248 } 249 if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 250 countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size()); 251 } else { 252 countList.add(0); 253 } 254 result.put(sn, countList); 255 } 256 return result; 257 } 258 259 public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) { 260 List<ServerName> favNodes = getFavoredNodes(regionInfo); 261 if (favNodes != null) { 262 if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) { 263 primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo); 264 } 265 if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) { 266 secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo); 267 } 268 if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) { 269 teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo); 270 } 271 globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo); 272 } 273 } 274 275 public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) { 276 for (RegionInfo regionInfo : regionInfoList) { 277 deleteFavoredNodesForRegion(regionInfo); 278 } 279 } 280 281 public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) { 282 Set<RegionInfo> regionInfos = Sets.newHashSet(); 283 284 ServerName serverToUse = ServerName.valueOf(serverName.getAddress(), NON_STARTCODE); 285 if (primaryRSToRegionMap.containsKey(serverToUse)) { 286 regionInfos.addAll(primaryRSToRegionMap.get(serverToUse)); 287 } 288 if (secondaryRSToRegionMap.containsKey(serverToUse)) { 289 regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse)); 290 } 291 if (teritiaryRSToRegionMap.containsKey(serverToUse)) { 292 regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse)); 293 } 294 return regionInfos; 295 } 296 297 public RackManager getRackManager() { 298 return rackManager; 299 } 300}