001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; 021import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 024 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Map; 028import java.util.concurrent.ConcurrentNavigableMap; 029import java.util.concurrent.ConcurrentSkipListMap; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.RegionLocations; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Objects; 039 040/** 041 * Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around 042 * ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads 043 * are not. 044 */ 045final class AsyncRegionLocationCache { 046 047 private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class); 048 049 private final ConcurrentNavigableMap<byte[], RegionLocations> cache = 050 new ConcurrentSkipListMap<>(BYTES_COMPARATOR); 051 private final TableName tableName; 052 053 public AsyncRegionLocationCache(TableName tableName) { 054 this.tableName = tableName; 055 } 056 057 /** 058 * Add the given locations to the cache, merging with existing if necessary. Also cleans out any 059 * previously cached locations which may have been superseded by this one (i.e. in case of merged 060 * regions). See {@link #cleanProblematicOverlappedRegions(RegionLocations)} 061 * @param locs the locations to cache 062 * @return the final location (possibly merged) that was added to the cache 063 */ 064 public synchronized RegionLocations add(RegionLocations locs) { 065 byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); 066 RegionLocations oldLocs = cache.putIfAbsent(startKey, locs); 067 if (oldLocs == null) { 068 cleanProblematicOverlappedRegions(locs); 069 return locs; 070 } 071 072 // check whether the regions are the same, this usually happens when table is split/merged, 073 // or deleted and recreated again. 074 RegionInfo region = locs.getRegionLocation().getRegion(); 075 RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); 076 if (region.getEncodedName().equals(oldRegion.getEncodedName())) { 077 RegionLocations mergedLocs = oldLocs.mergeLocations(locs); 078 if (isEqual(mergedLocs, oldLocs)) { 079 // the merged one is the same with the old one, give up 080 LOG.trace("Will not add {} to cache because the old value {} " 081 + " is newer than us or has the same server name." 082 + " Maybe it is updated before we replace it", locs, oldLocs); 083 return oldLocs; 084 } 085 locs = mergedLocs; 086 } else { 087 // the region is different, here we trust the one we fetched. This maybe wrong but finally 088 // the upper layer can detect this and trigger removal of the wrong locations 089 if (LOG.isDebugEnabled()) { 090 LOG.debug("The newly fetch region {} is different from the old one {} for row '{}'," 091 + " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); 092 } 093 } 094 095 cache.put(startKey, locs); 096 cleanProblematicOverlappedRegions(locs); 097 return locs; 098 } 099 100 /** 101 * When caching a location, the region may have been the result of a merge. Check to see if the 102 * region's boundaries overlap any other cached locations in a problematic way. Those would have 103 * been merge parents which no longer exist. We need to proactively clear them out to avoid a case 104 * where a merged region which receives no requests never gets cleared. This causes requests to 105 * other merged regions after it to see the wrong cached location. 106 * <p> 107 * For example, if we have Start_New < Start_Old < End_Old < End_New, then if we only access 108 * within range [End_Old, End_New], then it will always return the old region but it will then 109 * find out the row is not in the range, and try to get the new region, and then we get 110 * [Start_New, End_New), still fall into the same situation. 111 * <p> 112 * If Start_Old is less than Start_New, even if we have overlap, it is not a problem, as when the 113 * row is greater than Start_New, we will locate to the new region, and if the row is less than 114 * Start_New, it will fall into the old region's range and we will try to access the region and 115 * get a NotServing exception, and then we will clean the cache. 116 * <p> 117 * See HBASE-27650 118 * @param locations the new location that was just cached 119 */ 120 private void cleanProblematicOverlappedRegions(RegionLocations locations) { 121 RegionInfo region = locations.getRegionLocation().getRegion(); 122 123 boolean isLast = isEmptyStopRow(region.getEndKey()); 124 125 while (true) { 126 Map.Entry<byte[], RegionLocations> overlap = 127 isLast ? cache.lastEntry() : cache.lowerEntry(region.getEndKey()); 128 if ( 129 overlap == null || overlap.getValue() == locations 130 || Bytes.equals(overlap.getKey(), region.getStartKey()) 131 ) { 132 break; 133 } 134 135 if (LOG.isDebugEnabled()) { 136 LOG.debug( 137 "Removing cached location {} (endKey={}) because it overlaps with " 138 + "new location {} (endKey={})", 139 overlap.getValue(), 140 Bytes.toStringBinary(overlap.getValue().getRegionLocation().getRegion().getEndKey()), 141 locations, Bytes.toStringBinary(locations.getRegionLocation().getRegion().getEndKey())); 142 } 143 144 cache.remove(overlap.getKey()); 145 } 146 } 147 148 private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { 149 HRegionLocation[] locArr1 = locs1.getRegionLocations(); 150 HRegionLocation[] locArr2 = locs2.getRegionLocations(); 151 if (locArr1.length != locArr2.length) { 152 return false; 153 } 154 for (int i = 0; i < locArr1.length; i++) { 155 // do not need to compare region info 156 HRegionLocation loc1 = locArr1[i]; 157 HRegionLocation loc2 = locArr2[i]; 158 if (loc1 == null) { 159 if (loc2 != null) { 160 return false; 161 } 162 } else { 163 if (loc2 == null) { 164 return false; 165 } 166 if (loc1.getSeqNum() != loc2.getSeqNum()) { 167 return false; 168 } 169 if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { 170 return false; 171 } 172 } 173 } 174 return true; 175 } 176 177 /** 178 * Returns all cached RegionLocations 179 */ 180 public Collection<RegionLocations> getAll() { 181 return Collections.unmodifiableCollection(cache.values()); 182 } 183 184 /** 185 * Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key 186 * does not exist in the cache it will return null. 187 * @param startKey region start key to directly look up 188 */ 189 public RegionLocations get(byte[] startKey) { 190 return cache.get(startKey); 191 } 192 193 /** 194 * Finds the RegionLocations for the region with the greatest startKey less than or equal to the 195 * given row 196 * @param row row to find locations 197 */ 198 public RegionLocations findForRow(byte[] row, int replicaId) { 199 Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row); 200 if (entry == null) { 201 return null; 202 } 203 RegionLocations locs = entry.getValue(); 204 if (locs == null) { 205 return null; 206 } 207 HRegionLocation loc = locs.getRegionLocation(replicaId); 208 if (loc == null) { 209 return null; 210 } 211 byte[] endKey = loc.getRegion().getEndKey(); 212 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { 213 if (LOG.isTraceEnabled()) { 214 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 215 Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); 216 } 217 return locs; 218 } else { 219 if (LOG.isTraceEnabled()) { 220 LOG.trace("Requested row {} comes after region end key of {} for cached location {}", 221 Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs); 222 } 223 return null; 224 } 225 } 226 227 /** 228 * Finds the RegionLocations for the region with the greatest startKey strictly less than the 229 * given row 230 * @param row row to find locations 231 */ 232 public RegionLocations findForBeforeRow(byte[] row, int replicaId) { 233 boolean isEmptyStopRow = isEmptyStopRow(row); 234 Map.Entry<byte[], RegionLocations> entry = 235 isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row); 236 if (entry == null) { 237 return null; 238 } 239 RegionLocations locs = entry.getValue(); 240 if (locs == null) { 241 return null; 242 } 243 HRegionLocation loc = locs.getRegionLocation(replicaId); 244 if (loc == null) { 245 return null; 246 } 247 if ( 248 isEmptyStopRow(loc.getRegion().getEndKey()) 249 || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) 250 ) { 251 if (LOG.isTraceEnabled()) { 252 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 253 Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); 254 } 255 return locs; 256 } else { 257 return null; 258 } 259 } 260 261 /** 262 * Removes the location from the cache if it exists and can be removed. 263 * @return true if entry was removed 264 */ 265 public synchronized boolean remove(HRegionLocation loc) { 266 byte[] startKey = loc.getRegion().getStartKey(); 267 RegionLocations oldLocs = cache.get(startKey); 268 if (oldLocs == null) { 269 return false; 270 } 271 272 HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); 273 if (!canUpdateOnError(loc, oldLoc)) { 274 return false; 275 } 276 277 RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); 278 if (newLocs == null) { 279 if (cache.remove(startKey, oldLocs)) { 280 return true; 281 } 282 } else { 283 cache.put(startKey, newLocs); 284 return true; 285 } 286 return false; 287 } 288 289 /** 290 * Returns the size of the region locations cache 291 */ 292 public int size() { 293 return cache.size(); 294 } 295 296 /** 297 * Removes serverName from all locations in the cache, fully removing any RegionLocations which 298 * are empty after removing the server from it. 299 * @param serverName server to remove from locations 300 */ 301 public synchronized void removeForServer(ServerName serverName) { 302 for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) { 303 byte[] regionName = entry.getKey(); 304 RegionLocations locs = entry.getValue(); 305 RegionLocations newLocs = locs.removeByServer(serverName); 306 if (locs == newLocs) { 307 continue; 308 } 309 if (newLocs.isEmpty()) { 310 cache.remove(regionName, locs); 311 } else { 312 cache.put(regionName, newLocs); 313 } 314 } 315 } 316}