001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
021import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
024
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Map;
028import java.util.concurrent.ConcurrentNavigableMap;
029import java.util.concurrent.ConcurrentSkipListMap;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.RegionLocations;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Objects;
039
040/**
041 * Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around
042 * ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads
043 * are not.
044 */
045final class AsyncRegionLocationCache {
046
047  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class);
048
049  private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
050    new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
051  private final TableName tableName;
052
053  public AsyncRegionLocationCache(TableName tableName) {
054    this.tableName = tableName;
055  }
056
057  /**
058   * Add the given locations to the cache, merging with existing if necessary. Also cleans out any
059   * previously cached locations which may have been superseded by this one (i.e. in case of merged
060   * regions). See {@link #cleanProblematicOverlappedRegions(RegionLocations)}
061   * @param locs the locations to cache
062   * @return the final location (possibly merged) that was added to the cache
063   */
064  public synchronized RegionLocations add(RegionLocations locs) {
065    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
066    RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
067    if (oldLocs == null) {
068      cleanProblematicOverlappedRegions(locs);
069      return locs;
070    }
071
072    // check whether the regions are the same, this usually happens when table is split/merged,
073    // or deleted and recreated again.
074    RegionInfo region = locs.getRegionLocation().getRegion();
075    RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
076    if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
077      RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
078      if (isEqual(mergedLocs, oldLocs)) {
079        // the merged one is the same with the old one, give up
080        LOG.trace("Will not add {} to cache because the old value {} "
081          + " is newer than us or has the same server name."
082          + " Maybe it is updated before we replace it", locs, oldLocs);
083        return oldLocs;
084      }
085      locs = mergedLocs;
086    } else {
087      // the region is different, here we trust the one we fetched. This maybe wrong but finally
088      // the upper layer can detect this and trigger removal of the wrong locations
089      if (LOG.isDebugEnabled()) {
090        LOG.debug("The newly fetch region {} is different from the old one {} for row '{}',"
091          + " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
092      }
093    }
094
095    cache.put(startKey, locs);
096    cleanProblematicOverlappedRegions(locs);
097    return locs;
098  }
099
100  /**
101   * When caching a location, the region may have been the result of a merge. Check to see if the
102   * region's boundaries overlap any other cached locations in a problematic way. Those would have
103   * been merge parents which no longer exist. We need to proactively clear them out to avoid a case
104   * where a merged region which receives no requests never gets cleared. This causes requests to
105   * other merged regions after it to see the wrong cached location.
106   * <p>
107   * For example, if we have Start_New < Start_Old < End_Old < End_New, then if we only access
108   * within range [End_Old, End_New], then it will always return the old region but it will then
109   * find out the row is not in the range, and try to get the new region, and then we get
110   * [Start_New, End_New), still fall into the same situation.
111   * <p>
112   * If Start_Old is less than Start_New, even if we have overlap, it is not a problem, as when the
113   * row is greater than Start_New, we will locate to the new region, and if the row is less than
114   * Start_New, it will fall into the old region's range and we will try to access the region and
115   * get a NotServing exception, and then we will clean the cache.
116   * <p>
117   * See HBASE-27650
118   * @param locations the new location that was just cached
119   */
120  private void cleanProblematicOverlappedRegions(RegionLocations locations) {
121    RegionInfo region = locations.getRegionLocation().getRegion();
122
123    boolean isLast = isEmptyStopRow(region.getEndKey());
124
125    while (true) {
126      Map.Entry<byte[], RegionLocations> overlap =
127        isLast ? cache.lastEntry() : cache.lowerEntry(region.getEndKey());
128      if (
129        overlap == null || overlap.getValue() == locations
130          || Bytes.equals(overlap.getKey(), region.getStartKey())
131      ) {
132        break;
133      }
134
135      if (LOG.isDebugEnabled()) {
136        LOG.debug(
137          "Removing cached location {} (endKey={}) because it overlaps with "
138            + "new location {} (endKey={})",
139          overlap.getValue(),
140          Bytes.toStringBinary(overlap.getValue().getRegionLocation().getRegion().getEndKey()),
141          locations, Bytes.toStringBinary(locations.getRegionLocation().getRegion().getEndKey()));
142      }
143
144      cache.remove(overlap.getKey());
145    }
146  }
147
148  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
149    HRegionLocation[] locArr1 = locs1.getRegionLocations();
150    HRegionLocation[] locArr2 = locs2.getRegionLocations();
151    if (locArr1.length != locArr2.length) {
152      return false;
153    }
154    for (int i = 0; i < locArr1.length; i++) {
155      // do not need to compare region info
156      HRegionLocation loc1 = locArr1[i];
157      HRegionLocation loc2 = locArr2[i];
158      if (loc1 == null) {
159        if (loc2 != null) {
160          return false;
161        }
162      } else {
163        if (loc2 == null) {
164          return false;
165        }
166        if (loc1.getSeqNum() != loc2.getSeqNum()) {
167          return false;
168        }
169        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
170          return false;
171        }
172      }
173    }
174    return true;
175  }
176
177  /**
178   * Returns all cached RegionLocations
179   */
180  public Collection<RegionLocations> getAll() {
181    return Collections.unmodifiableCollection(cache.values());
182  }
183
184  /**
185   * Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key
186   * does not exist in the cache it will return null.
187   * @param startKey region start key to directly look up
188   */
189  public RegionLocations get(byte[] startKey) {
190    return cache.get(startKey);
191  }
192
193  /**
194   * Finds the RegionLocations for the region with the greatest startKey less than or equal to the
195   * given row
196   * @param row row to find locations
197   */
198  public RegionLocations findForRow(byte[] row, int replicaId) {
199    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
200    if (entry == null) {
201      return null;
202    }
203    RegionLocations locs = entry.getValue();
204    if (locs == null) {
205      return null;
206    }
207    HRegionLocation loc = locs.getRegionLocation(replicaId);
208    if (loc == null) {
209      return null;
210    }
211    byte[] endKey = loc.getRegion().getEndKey();
212    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
213      if (LOG.isTraceEnabled()) {
214        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
215          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
216      }
217      return locs;
218    } else {
219      if (LOG.isTraceEnabled()) {
220        LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
221          Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs);
222      }
223      return null;
224    }
225  }
226
227  /**
228   * Finds the RegionLocations for the region with the greatest startKey strictly less than the
229   * given row
230   * @param row row to find locations
231   */
232  public RegionLocations findForBeforeRow(byte[] row, int replicaId) {
233    boolean isEmptyStopRow = isEmptyStopRow(row);
234    Map.Entry<byte[], RegionLocations> entry =
235      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
236    if (entry == null) {
237      return null;
238    }
239    RegionLocations locs = entry.getValue();
240    if (locs == null) {
241      return null;
242    }
243    HRegionLocation loc = locs.getRegionLocation(replicaId);
244    if (loc == null) {
245      return null;
246    }
247    if (
248      isEmptyStopRow(loc.getRegion().getEndKey())
249        || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
250    ) {
251      if (LOG.isTraceEnabled()) {
252        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
253          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
254      }
255      return locs;
256    } else {
257      return null;
258    }
259  }
260
261  /**
262   * Removes the location from the cache if it exists and can be removed.
263   * @return true if entry was removed
264   */
265  public synchronized boolean remove(HRegionLocation loc) {
266    byte[] startKey = loc.getRegion().getStartKey();
267    RegionLocations oldLocs = cache.get(startKey);
268    if (oldLocs == null) {
269      return false;
270    }
271
272    HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
273    if (!canUpdateOnError(loc, oldLoc)) {
274      return false;
275    }
276
277    RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
278    if (newLocs == null) {
279      if (cache.remove(startKey, oldLocs)) {
280        return true;
281      }
282    } else {
283      cache.put(startKey, newLocs);
284      return true;
285    }
286    return false;
287  }
288
289  /**
290   * Returns the size of the region locations cache
291   */
292  public int size() {
293    return cache.size();
294  }
295
296  /**
297   * Removes serverName from all locations in the cache, fully removing any RegionLocations which
298   * are empty after removing the server from it.
299   * @param serverName server to remove from locations
300   */
301  public synchronized void removeForServer(ServerName serverName) {
302    for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
303      byte[] regionName = entry.getKey();
304      RegionLocations locs = entry.getValue();
305      RegionLocations newLocs = locs.removeByServer(serverName);
306      if (locs == newLocs) {
307        continue;
308      }
309      if (newLocs.isEmpty()) {
310        cache.remove(regionName, locs);
311      } else {
312        cache.put(regionName, newLocs);
313      }
314    }
315  }
316}