001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Set; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.ConcurrentNavigableMap; 027import java.util.concurrent.CopyOnWriteArraySet; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.RegionLocations; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A cache implementation for region locations from meta. 041 */ 042@InterfaceAudience.Private 043public class MetaCache { 044 045 private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class); 046 047 /** 048 * Map of table to table {@link HRegionLocation}s. <br> 049 * Despite being Concurrent, writes to the map should be synchronized because we have cases where 050 * we need to make multiple updates atomically. 051 */ 052 private final ConcurrentMap<TableName, 053 ConcurrentNavigableMap<byte[], RegionLocations>> cachedRegionLocations = 054 new CopyOnWriteArrayMap<>(); 055 056 // The presence of a server in the map implies it's likely that there is an 057 // entry in cachedRegionLocations that map to this server; but the absence 058 // of a server in this map guarantees that there is no entry in cache that 059 // maps to the absent server. 060 // The access to this attribute must be protected by a lock on cachedRegionLocations 061 private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>(); 062 063 private final MetricsConnection metrics; 064 065 public MetaCache(MetricsConnection metrics) { 066 this.metrics = metrics; 067 } 068 069 /** 070 * Search the cache for a location that fits our table and row key. Return null if no suitable 071 * region is located. 072 * @return Null or region location found in cache. 073 */ 074 public RegionLocations getCachedLocation(final TableName tableName, final byte[] row) { 075 ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 076 077 Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row); 078 if (e == null) { 079 if (metrics != null) metrics.incrMetaCacheMiss(); 080 return null; 081 } 082 RegionLocations possibleRegion = e.getValue(); 083 084 // make sure that the end key is greater than the row we're looking 085 // for, otherwise the row actually belongs in the next region, not 086 // this one. the exception case is when the endkey is 087 // HConstants.EMPTY_END_ROW, signifying that the region we're 088 // checking is actually the last region in the table. 089 byte[] endKey = possibleRegion.getRegionLocation().getRegion().getEndKey(); 090 // Here we do direct Bytes.compareTo and not doing CellComparator/MetaCellComparator path. 091 // MetaCellComparator is for comparing against data in META table which need special handling. 092 // Not doing that is ok for this case because 093 // 1. We are getting the Region location for the given row in non META tables only. The compare 094 // checks the given row is within the end key of the found region. So META regions are not 095 // coming in here. 096 // 2. Even if META region comes in, its end key will be empty byte[] and so Bytes.equals(endKey, 097 // HConstants.EMPTY_END_ROW) check itself will pass. 098 if ( 099 Bytes.equals(endKey, HConstants.EMPTY_END_ROW) 100 || Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0 101 ) { 102 if (metrics != null) metrics.incrMetaCacheHit(); 103 return possibleRegion; 104 } 105 106 if (LOG.isTraceEnabled()) { 107 LOG.trace("Requested row {} comes after region end key of {} for cached location {}", 108 Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), possibleRegion); 109 } 110 // Passed all the way through, so we got nothing - complete cache miss 111 if (metrics != null) metrics.incrMetaCacheMiss(); 112 return null; 113 } 114 115 /** 116 * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to 117 * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be 118 * atomic. 119 * @param tableName The table name. 120 * @param source the source of the new location 121 * @param location the new location 122 */ 123 public synchronized void cacheLocation(final TableName tableName, final ServerName source, 124 final HRegionLocation location) { 125 assert source != null; 126 byte[] startKey = location.getRegion().getStartKey(); 127 ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 128 RegionLocations locations = new RegionLocations(new HRegionLocation[] { location }); 129 RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); 130 boolean isNewCacheEntry = (oldLocations == null); 131 if (isNewCacheEntry) { 132 if (LOG.isTraceEnabled()) { 133 LOG.trace("Cached location: " + location); 134 } 135 addToCachedServers(locations); 136 MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations); 137 return; 138 } 139 140 // If the server in cache sends us a redirect, assume it's always valid. 141 HRegionLocation oldLocation = 142 oldLocations.getRegionLocation(location.getRegion().getReplicaId()); 143 boolean force = oldLocation != null && oldLocation.getServerName() != null 144 && oldLocation.getServerName().equals(source); 145 146 // For redirect if the number is equal to previous 147 // record, the most common case is that first the region was closed with seqNum, and then 148 // opened with the same seqNum; hence we will ignore the redirect. 149 // There are so many corner cases with various combinations of opens and closes that 150 // an additional counter on top of seqNum would be necessary to handle them all. 151 RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); 152 if (oldLocations != updatedLocations) { 153 tableLocations.put(startKey, updatedLocations); 154 MetaCacheUtil.cleanProblematicOverlappedRegions(updatedLocations, tableLocations); 155 if (LOG.isTraceEnabled()) { 156 LOG.trace("Changed cached location to: " + location); 157 } 158 addToCachedServers(updatedLocations); 159 } 160 } 161 162 /** 163 * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to 164 * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be 165 * atomic. 166 * @param tableName The table name. 167 * @param locations the new locations 168 */ 169 public synchronized void cacheLocation(final TableName tableName, 170 final RegionLocations locations) { 171 byte[] startKey = locations.getRegionLocation().getRegion().getStartKey(); 172 ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 173 RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); 174 boolean isNewCacheEntry = (oldLocation == null); 175 if (isNewCacheEntry) { 176 if (LOG.isTraceEnabled()) { 177 LOG.trace("Cached location: " + locations); 178 } 179 addToCachedServers(locations); 180 MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations); 181 return; 182 } 183 184 // merge old and new locations and add it to the cache 185 // Meta record might be stale - some (probably the same) server has closed the region 186 // with later seqNum and told us about the new location. 187 RegionLocations mergedLocation = oldLocation.mergeLocations(locations); 188 tableLocations.put(startKey, mergedLocation); 189 MetaCacheUtil.cleanProblematicOverlappedRegions(mergedLocation, tableLocations); 190 if (LOG.isTraceEnabled()) { 191 LOG.trace("Merged cached locations: " + mergedLocation); 192 } 193 addToCachedServers(locations); 194 } 195 196 private void addToCachedServers(RegionLocations locations) { 197 for (HRegionLocation loc : locations.getRegionLocations()) { 198 if (loc != null) { 199 cachedServers.add(loc.getServerName()); 200 } 201 } 202 } 203 204 /** 205 * Returns Map of cached locations for passed <code>tableName</code>.<br> 206 * Despite being Concurrent, writes to the map should be synchronized because we have cases where 207 * we need to make multiple updates atomically. 208 */ 209 private ConcurrentNavigableMap<byte[], RegionLocations> 210 getTableLocations(final TableName tableName) { 211 // find the map of cached locations for this table 212 return computeIfAbsent(cachedRegionLocations, tableName, 213 () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR)); 214 } 215 216 /** 217 * Check the region cache to see whether a region is cached yet or not. 218 * @param tableName tableName 219 * @param row row 220 * @return Region cached or not. 221 */ 222 public boolean isRegionCached(TableName tableName, final byte[] row) { 223 RegionLocations location = getCachedLocation(tableName, row); 224 return location != null; 225 } 226 227 /** 228 * Return the number of cached region for a table. It will only be called from a unit test. 229 */ 230 public int getNumberOfCachedRegionLocations(final TableName tableName) { 231 Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName); 232 if (tableLocs == null) { 233 return 0; 234 } 235 int numRegions = 0; 236 for (RegionLocations tableLoc : tableLocs.values()) { 237 numRegions += tableLoc.numNonNullElements(); 238 } 239 return numRegions; 240 } 241 242 /** 243 * Delete all cached entries. <br> 244 * Synchronized because of calls in cacheLocation which need to be executed atomically 245 */ 246 public synchronized void clearCache() { 247 this.cachedRegionLocations.clear(); 248 this.cachedServers.clear(); 249 } 250 251 /** 252 * Delete all cached entries of a server. <br> 253 * Synchronized because of calls in cacheLocation which need to be executed atomically 254 */ 255 public synchronized void clearCache(final ServerName serverName) { 256 // Prior to synchronizing this method, we used to do another check below while synchronizing 257 // on cachedServers. This is no longer necessary since we moved synchronization up. 258 // Prior reason: 259 // We block here, because if there is an error on a server, it's likely that multiple 260 // threads will get the error simultaneously. If there are hundreds of thousand of 261 // region location to check, it's better to do this only once. A better pattern would 262 // be to check if the server is dead when we get the region location. 263 if (!this.cachedServers.contains(serverName)) { 264 return; 265 } 266 267 boolean deletedSomething = false; 268 for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()) { 269 for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) { 270 RegionLocations regionLocations = e.getValue(); 271 if (regionLocations != null) { 272 RegionLocations updatedLocations = regionLocations.removeByServer(serverName); 273 if (updatedLocations != regionLocations) { 274 deletedSomething = true; 275 if (updatedLocations.isEmpty()) { 276 tableLocations.remove(e.getKey()); 277 } else { 278 tableLocations.put(e.getKey(), updatedLocations); 279 } 280 } 281 } 282 } 283 } 284 this.cachedServers.remove(serverName); 285 if (deletedSomething) { 286 if (metrics != null) { 287 metrics.incrMetaCacheNumClearServer(); 288 } 289 if (LOG.isTraceEnabled()) { 290 LOG.trace("Removed all cached region locations that map to " + serverName); 291 } 292 } 293 } 294 295 /** 296 * Delete a cached location, no matter what it is. Called when we were told to not use cache.<br> 297 * Synchronized because of calls in cacheLocation which need to be executed atomically 298 * @param tableName tableName 299 */ 300 public synchronized void clearCache(final TableName tableName, final byte[] row) { 301 ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 302 303 RegionLocations regionLocations = getCachedLocation(tableName, row); 304 if (regionLocations != null) { 305 byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); 306 tableLocations.remove(startKey); 307 if (metrics != null) { 308 metrics.incrMetaCacheNumClearRegion(); 309 } 310 if (LOG.isTraceEnabled()) { 311 LOG.trace("Removed " + regionLocations + " from cache"); 312 } 313 } 314 } 315 316 /** 317 * Delete all cached entries of a table.<br> 318 * Synchronized because of calls in cacheLocation which need to be executed atomically 319 */ 320 public synchronized void clearCache(final TableName tableName) { 321 if (LOG.isTraceEnabled()) { 322 LOG.trace("Removed all cached region locations for table " + tableName); 323 } 324 this.cachedRegionLocations.remove(tableName); 325 } 326 327 /** 328 * Delete a cached location with specific replicaId.<br> 329 * Synchronized because of calls in cacheLocation which need to be executed atomically 330 * @param tableName tableName 331 * @param row row key 332 * @param replicaId region replica id 333 */ 334 public synchronized void clearCache(final TableName tableName, final byte[] row, int replicaId) { 335 ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 336 337 RegionLocations regionLocations = getCachedLocation(tableName, row); 338 if (regionLocations != null) { 339 HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); 340 if (toBeRemoved != null) { 341 RegionLocations updatedLocations = regionLocations.remove(replicaId); 342 byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); 343 if (updatedLocations.isEmpty()) { 344 tableLocations.remove(startKey); 345 } else { 346 tableLocations.put(startKey, updatedLocations); 347 } 348 349 if (metrics != null) { 350 metrics.incrMetaCacheNumClearRegion(); 351 } 352 if (LOG.isTraceEnabled()) { 353 LOG.trace("Removed " + toBeRemoved + " from cache"); 354 } 355 } 356 } 357 } 358 359 /** 360 * Delete a cached location for a table, row and server. <br> 361 * Synchronized because of calls in cacheLocation which need to be executed atomically 362 */ 363 public synchronized void clearCache(final TableName tableName, final byte[] row, 364 ServerName serverName) { 365 ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); 366 367 RegionLocations regionLocations = getCachedLocation(tableName, row); 368 if (regionLocations != null) { 369 RegionLocations updatedLocations = regionLocations.removeByServer(serverName); 370 if (updatedLocations != regionLocations) { 371 byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); 372 if (updatedLocations.isEmpty()) { 373 tableLocations.remove(startKey); 374 } else { 375 tableLocations.put(startKey, updatedLocations); 376 } 377 if (metrics != null) { 378 metrics.incrMetaCacheNumClearRegion(); 379 } 380 if (LOG.isTraceEnabled()) { 381 LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) 382 + " mapping to server: " + serverName + " from cache"); 383 } 384 } 385 } 386 } 387 388 /** 389 * Deletes the cached location of the region if necessary, based on some error from source.<br> 390 * Synchronized because of calls in cacheLocation which need to be executed atomically 391 * @param hri The region in question. 392 */ 393 public synchronized void clearCache(RegionInfo hri) { 394 ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable()); 395 RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); 396 if (regionLocations != null) { 397 HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); 398 if (oldLocation == null) return; 399 RegionLocations updatedLocations = regionLocations.remove(oldLocation); 400 if (updatedLocations != regionLocations) { 401 if (updatedLocations.isEmpty()) { 402 tableLocations.remove(hri.getStartKey()); 403 } else { 404 tableLocations.put(hri.getStartKey(), updatedLocations); 405 } 406 if (metrics != null) { 407 metrics.incrMetaCacheNumClearRegion(); 408 } 409 if (LOG.isTraceEnabled()) { 410 LOG.trace("Removed " + oldLocation + " from cache"); 411 } 412 } 413 } 414 } 415 416}