001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
021import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
024
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Map;
028import java.util.concurrent.ConcurrentNavigableMap;
029import java.util.concurrent.ConcurrentSkipListMap;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.RegionLocations;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Objects;
039
040/**
041 * Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around
042 * ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads
043 * are not.
044 */
045final class AsyncRegionLocationCache {
046
047  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class);
048
049  private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
050    new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
051  private final TableName tableName;
052
053  public AsyncRegionLocationCache(TableName tableName) {
054    this.tableName = tableName;
055  }
056
057  /**
058   * Add the given locations to the cache, merging with existing if necessary. Also cleans out any
059   * previously cached locations which may have been superseded by this one (i.e. in case of merged
060   * regions). See {@link MetaCacheUtil} cleanProblematicOverlappedRegions
061   * @param locs the locations to cache
062   * @return the final location (possibly merged) that was added to the cache
063   */
064  public synchronized RegionLocations add(RegionLocations locs) {
065    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
066    RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
067    if (oldLocs == null) {
068      MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache);
069      return locs;
070    }
071
072    // check whether the regions are the same, this usually happens when table is split/merged,
073    // or deleted and recreated again.
074    RegionInfo region = locs.getRegionLocation().getRegion();
075    RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
076    if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
077      RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
078      if (isEqual(mergedLocs, oldLocs)) {
079        // the merged one is the same with the old one, give up
080        LOG.trace("Will not add {} to cache because the old value {} "
081          + " is newer than us or has the same server name."
082          + " Maybe it is updated before we replace it", locs, oldLocs);
083        return oldLocs;
084      }
085      locs = mergedLocs;
086    } else {
087      // the region is different, here we trust the one we fetched. This maybe wrong but finally
088      // the upper layer can detect this and trigger removal of the wrong locations
089      if (LOG.isDebugEnabled()) {
090        LOG.debug("The newly fetch region {} is different from the old one {} for row '{}',"
091          + " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
092      }
093    }
094
095    cache.put(startKey, locs);
096    MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache);
097    return locs;
098  }
099
100  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
101    HRegionLocation[] locArr1 = locs1.getRegionLocations();
102    HRegionLocation[] locArr2 = locs2.getRegionLocations();
103    if (locArr1.length != locArr2.length) {
104      return false;
105    }
106    for (int i = 0; i < locArr1.length; i++) {
107      // do not need to compare region info
108      HRegionLocation loc1 = locArr1[i];
109      HRegionLocation loc2 = locArr2[i];
110      if (loc1 == null) {
111        if (loc2 != null) {
112          return false;
113        }
114      } else {
115        if (loc2 == null) {
116          return false;
117        }
118        if (loc1.getSeqNum() != loc2.getSeqNum()) {
119          return false;
120        }
121        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
122          return false;
123        }
124      }
125    }
126    return true;
127  }
128
129  /**
130   * Returns all cached RegionLocations
131   */
132  public Collection<RegionLocations> getAll() {
133    return Collections.unmodifiableCollection(cache.values());
134  }
135
136  /**
137   * Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key
138   * does not exist in the cache it will return null.
139   * @param startKey region start key to directly look up
140   */
141  public RegionLocations get(byte[] startKey) {
142    return cache.get(startKey);
143  }
144
145  /**
146   * Finds the RegionLocations for the region with the greatest startKey less than or equal to the
147   * given row
148   * @param row row to find locations
149   */
150  public RegionLocations findForRow(byte[] row, int replicaId) {
151    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
152    if (entry == null) {
153      return null;
154    }
155    RegionLocations locs = entry.getValue();
156    if (locs == null) {
157      return null;
158    }
159    HRegionLocation loc = locs.getRegionLocation(replicaId);
160    if (loc == null) {
161      return null;
162    }
163    byte[] endKey = loc.getRegion().getEndKey();
164    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
165      if (LOG.isTraceEnabled()) {
166        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
167          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
168      }
169      return locs;
170    } else {
171      if (LOG.isTraceEnabled()) {
172        LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
173          Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs);
174      }
175      return null;
176    }
177  }
178
179  /**
180   * Finds the RegionLocations for the region with the greatest startKey strictly less than the
181   * given row
182   * @param row row to find locations
183   */
184  public RegionLocations findForBeforeRow(byte[] row, int replicaId) {
185    boolean isEmptyStopRow = isEmptyStopRow(row);
186    Map.Entry<byte[], RegionLocations> entry =
187      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
188    if (entry == null) {
189      return null;
190    }
191    RegionLocations locs = entry.getValue();
192    if (locs == null) {
193      return null;
194    }
195    HRegionLocation loc = locs.getRegionLocation(replicaId);
196    if (loc == null) {
197      return null;
198    }
199    if (
200      isEmptyStopRow(loc.getRegion().getEndKey())
201        || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
202    ) {
203      if (LOG.isTraceEnabled()) {
204        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
205          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
206      }
207      return locs;
208    } else {
209      return null;
210    }
211  }
212
213  /**
214   * Removes the location from the cache if it exists and can be removed.
215   * @return true if entry was removed
216   */
217  public synchronized boolean remove(HRegionLocation loc) {
218    byte[] startKey = loc.getRegion().getStartKey();
219    RegionLocations oldLocs = cache.get(startKey);
220    if (oldLocs == null) {
221      return false;
222    }
223
224    HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
225    if (!canUpdateOnError(loc, oldLoc)) {
226      return false;
227    }
228
229    RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
230    if (newLocs == null) {
231      if (cache.remove(startKey, oldLocs)) {
232        return true;
233      }
234    } else {
235      cache.put(startKey, newLocs);
236      return true;
237    }
238    return false;
239  }
240
241  /**
242   * Returns the size of the region locations cache
243   */
244  public int size() {
245    return cache.size();
246  }
247
248  /**
249   * Removes serverName from all locations in the cache, fully removing any RegionLocations which
250   * are empty after removing the server from it.
251   * @param serverName server to remove from locations
252   */
253  public synchronized void removeForServer(ServerName serverName) {
254    for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
255      byte[] regionName = entry.getKey();
256      RegionLocations locs = entry.getValue();
257      RegionLocations newLocs = locs.removeByServer(serverName);
258      if (locs == newLocs) {
259        continue;
260      }
261      if (newLocs.isEmpty()) {
262        cache.remove(regionName, locs);
263      } else {
264        cache.put(regionName, newLocs);
265      }
266    }
267  }
268}