001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Executors; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HDFSBlocksDistribution; 034import org.apache.hadoop.hbase.RegionMetrics; 035import org.apache.hadoop.hbase.ServerMetrics; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.master.MasterServices; 041import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 049import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 050import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures; 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 056import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 057 058/** 059 * This will find where data for a region is located in HDFS. It ranks {@link ServerName}'s by the 060 * size of the store files they are holding for a given region. 061 */ 062@InterfaceAudience.Private 063class RegionLocationFinder { 064 private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class); 065 private static final long CACHE_TIME = 240 * 60 * 1000; 066 private static final float EPSILON = 0.0001f; 067 private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = 068 new HDFSBlocksDistribution(); 069 070 private Configuration conf; 071 private volatile ClusterMetrics status; 072 private MasterServices services; 073 private final ListeningExecutorService executor; 074 // Do not scheduleFullRefresh at master startup 075 private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); 076 077 private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader = 078 new CacheLoader<RegionInfo, HDFSBlocksDistribution>() { 079 080 @Override 081 public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri, 082 HDFSBlocksDistribution oldValue) throws Exception { 083 return executor.submit(new Callable<HDFSBlocksDistribution>() { 084 @Override 085 public HDFSBlocksDistribution call() throws Exception { 086 return internalGetTopBlockLocation(hri); 087 } 088 }); 089 } 090 091 @Override 092 public HDFSBlocksDistribution load(RegionInfo key) throws Exception { 093 return internalGetTopBlockLocation(key); 094 } 095 }; 096 097 // The cache for where regions are located. 098 private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null; 099 100 RegionLocationFinder() { 101 this.cache = createCache(); 102 executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5, 103 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("region-location-%d").build())); 104 } 105 106 /** 107 * Create a cache for region to list of servers 108 * @return A new Cache. 109 */ 110 private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() { 111 return CacheBuilder.newBuilder().expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS) 112 .build(loader); 113 } 114 115 public Configuration getConf() { 116 return conf; 117 } 118 119 public void setConf(Configuration conf) { 120 this.conf = conf; 121 } 122 123 public void setServices(MasterServices services) { 124 this.services = services; 125 } 126 127 public void setClusterMetrics(ClusterMetrics status) { 128 long currentTime = EnvironmentEdgeManager.currentTime(); 129 130 if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { 131 this.status = status; 132 // Only count the refresh if it includes user tables ( eg more than meta and namespace ). 133 lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh; 134 } else { 135 refreshLocalityChangedRegions(this.status, status); 136 this.status = status; 137 } 138 } 139 140 /** 141 * If locality for a region has changed, that pretty certainly means our cache is out of date. 142 * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality. 143 */ 144 private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) { 145 if (oldStatus == null || newStatus == null) { 146 LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, 147 newStatus); 148 return; 149 } 150 151 Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics(); 152 Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics(); 153 154 Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size()); 155 for (RegionInfo regionInfo : cache.asMap().keySet()) { 156 regionsByName.put(regionInfo.getEncodedName(), regionInfo); 157 } 158 159 for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) { 160 Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics(); 161 for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) { 162 String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey()); 163 RegionInfo region = regionsByName.get(encodedName); 164 if (region == null) { 165 continue; 166 } 167 168 float newLocality = regionEntry.getValue().getDataLocality(); 169 float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers); 170 171 if (Math.abs(newLocality - oldLocality) > EPSILON) { 172 LOG.debug("Locality for region {} changed from {} to {}, refreshing cache", 173 region.getEncodedName(), oldLocality, newLocality); 174 cache.refresh(region); 175 } 176 } 177 178 } 179 } 180 181 private float getOldLocality(ServerName newServer, byte[] regionName, 182 Map<ServerName, ServerMetrics> oldServers) { 183 ServerMetrics serverMetrics = oldServers.get(newServer); 184 if (serverMetrics == null) { 185 return -1f; 186 } 187 RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName); 188 if (regionMetrics == null) { 189 return -1f; 190 } 191 192 return regionMetrics.getDataLocality(); 193 } 194 195 /** 196 * Refresh all the region locations. 197 * @return true if user created regions got refreshed. 198 */ 199 private boolean scheduleFullRefresh() { 200 // Protect from anything being null while starting up. 201 if (services == null) { 202 return false; 203 } 204 205 final AssignmentManager am = services.getAssignmentManager(); 206 if (am == null) { 207 return false; 208 } 209 210 // TODO: Should this refresh all the regions or only the ones assigned? 211 boolean includesUserTables = false; 212 for (final RegionInfo hri : am.getAssignedRegions()) { 213 cache.refresh(hri); 214 includesUserTables = includesUserTables || !hri.getTable().isSystemTable(); 215 } 216 return includesUserTables; 217 } 218 219 protected List<ServerName> getTopBlockLocations(RegionInfo region) { 220 List<String> topHosts = getBlockDistribution(region).getTopHosts(); 221 return mapHostNameToServerName(topHosts); 222 } 223 224 /** 225 * Returns an ordered list of hosts which have better locality for this region than the current 226 * host. 227 */ 228 protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) { 229 HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); 230 List<String> topHosts = new ArrayList<>(); 231 for (String host : blocksDistribution.getTopHosts()) { 232 if (host.equals(currentHost)) { 233 break; 234 } 235 topHosts.add(host); 236 } 237 return mapHostNameToServerName(topHosts); 238 } 239 240 /** 241 * Returns an ordered list of hosts that are hosting the blocks for this region. The weight of 242 * each host is the sum of the block lengths of all files on that host, so the first host in the 243 * list is the server which holds the most bytes of the given region's HFiles. 244 * @param region region 245 * @return ordered list of hosts holding blocks of the specified region 246 */ 247 protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { 248 try { 249 TableDescriptor tableDescriptor = getTableDescriptor(region.getTable()); 250 if (tableDescriptor != null) { 251 HDFSBlocksDistribution blocksDistribution = 252 HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); 253 return blocksDistribution; 254 } 255 } catch (IOException ioe) { 256 LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}", 257 region.getEncodedName(), ioe); 258 } 259 260 return EMPTY_BLOCK_DISTRIBUTION; 261 } 262 263 /** 264 * return TableDescriptor for a given tableName 265 * @param tableName the table name 266 */ 267 protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException { 268 TableDescriptor tableDescriptor = null; 269 try { 270 if (this.services != null && this.services.getTableDescriptors() != null) { 271 tableDescriptor = this.services.getTableDescriptors().get(tableName); 272 } 273 } catch (FileNotFoundException fnfe) { 274 LOG.debug("tableName={}", tableName, fnfe); 275 } 276 277 return tableDescriptor; 278 } 279 280 /** 281 * Map hostname to ServerName, The output ServerName list will have the same order as input hosts. 282 * @param hosts the list of hosts 283 * @return ServerName list 284 */ 285 List<ServerName> mapHostNameToServerName(List<String> hosts) { 286 if (hosts == null || status == null) { 287 if (hosts == null) { 288 LOG.warn("RegionLocationFinder top hosts is null"); 289 } 290 return Lists.newArrayList(); 291 } 292 293 List<ServerName> topServerNames = new ArrayList<>(); 294 Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet(); 295 296 // create a mapping from hostname to ServerName for fast lookup 297 HashMap<String, List<ServerName>> hostToServerName = new HashMap<>(); 298 for (ServerName sn : regionServers) { 299 String host = sn.getHostname(); 300 if (!hostToServerName.containsKey(host)) { 301 hostToServerName.put(host, new ArrayList<>()); 302 } 303 hostToServerName.get(host).add(sn); 304 } 305 306 for (String host : hosts) { 307 if (!hostToServerName.containsKey(host)) { 308 continue; 309 } 310 for (ServerName sn : hostToServerName.get(host)) { 311 // it is possible that HDFS is up ( thus host is valid ), 312 // but RS is down ( thus sn is null ) 313 if (sn != null) { 314 topServerNames.add(sn); 315 } 316 } 317 } 318 return topServerNames; 319 } 320 321 public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) { 322 HDFSBlocksDistribution blockDistbn = null; 323 try { 324 if (cache.asMap().containsKey(hri)) { 325 blockDistbn = cache.get(hri); 326 return blockDistbn; 327 } else { 328 LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString()); 329 blockDistbn = internalGetTopBlockLocation(hri); 330 cache.put(hri, blockDistbn); 331 return blockDistbn; 332 } 333 } catch (ExecutionException e) { 334 LOG.warn("Error while fetching cache entry ", e); 335 blockDistbn = internalGetTopBlockLocation(hri); 336 cache.put(hri, blockDistbn); 337 return blockDistbn; 338 } 339 } 340 341 private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(RegionInfo hri) { 342 try { 343 return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); 344 } catch (Exception e) { 345 return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); 346 } 347 } 348 349 public void refreshAndWait(Collection<RegionInfo> hris) { 350 ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = 351 new ArrayList<>(hris.size()); 352 for (RegionInfo hregionInfo : hris) { 353 regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); 354 } 355 int index = 0; 356 for (RegionInfo hregionInfo : hris) { 357 ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures.get(index); 358 try { 359 cache.put(hregionInfo, future.get()); 360 } catch (InterruptedException ite) { 361 Thread.currentThread().interrupt(); 362 } catch (ExecutionException ee) { 363 LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}", 364 hregionInfo.getEncodedName(), ee); 365 } 366 index++; 367 } 368 } 369 370 // For test 371 LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() { 372 return cache; 373 } 374}