001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Set;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ConcurrentNavigableMap;
027import java.util.concurrent.CopyOnWriteArraySet;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.RegionLocations;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A cache implementation for region locations from meta.
041 */
042@InterfaceAudience.Private
043public class MetaCache {
044
045  private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class);
046
047  /**
048   * Map of table to table {@link HRegionLocation}s. <br>
049   * Despite being Concurrent, writes to the map should be synchronized because we have cases where
050   * we need to make multiple updates atomically.
051   */
052  private final ConcurrentMap<TableName,
053    ConcurrentNavigableMap<byte[], RegionLocations>> cachedRegionLocations =
054      new CopyOnWriteArrayMap<>();
055
056  // The presence of a server in the map implies it's likely that there is an
057  // entry in cachedRegionLocations that map to this server; but the absence
058  // of a server in this map guarantees that there is no entry in cache that
059  // maps to the absent server.
060  // The access to this attribute must be protected by a lock on cachedRegionLocations
061  private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();
062
063  private final MetricsConnection metrics;
064
065  public MetaCache(MetricsConnection metrics) {
066    this.metrics = metrics;
067  }
068
069  /**
070   * Search the cache for a location that fits our table and row key. Return null if no suitable
071   * region is located.
072   * @return Null or region location found in cache.
073   */
074  public RegionLocations getCachedLocation(final TableName tableName, final byte[] row) {
075    ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
076
077    Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
078    if (e == null) {
079      if (metrics != null) metrics.incrMetaCacheMiss();
080      return null;
081    }
082    RegionLocations possibleRegion = e.getValue();
083
084    // make sure that the end key is greater than the row we're looking
085    // for, otherwise the row actually belongs in the next region, not
086    // this one. the exception case is when the endkey is
087    // HConstants.EMPTY_END_ROW, signifying that the region we're
088    // checking is actually the last region in the table.
089    byte[] endKey = possibleRegion.getRegionLocation().getRegion().getEndKey();
090    // Here we do direct Bytes.compareTo and not doing CellComparator/MetaCellComparator path.
091    // MetaCellComparator is for comparing against data in META table which need special handling.
092    // Not doing that is ok for this case because
093    // 1. We are getting the Region location for the given row in non META tables only. The compare
094    // checks the given row is within the end key of the found region. So META regions are not
095    // coming in here.
096    // 2. Even if META region comes in, its end key will be empty byte[] and so Bytes.equals(endKey,
097    // HConstants.EMPTY_END_ROW) check itself will pass.
098    if (
099      Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
100        || Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0
101    ) {
102      if (metrics != null) metrics.incrMetaCacheHit();
103      return possibleRegion;
104    }
105
106    if (LOG.isTraceEnabled()) {
107      LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
108        Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), possibleRegion);
109    }
110    // Passed all the way through, so we got nothing - complete cache miss
111    if (metrics != null) metrics.incrMetaCacheMiss();
112    return null;
113  }
114
115  /**
116   * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to
117   * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be
118   * atomic.
119   * @param tableName The table name.
120   * @param source    the source of the new location
121   * @param location  the new location
122   */
123  public synchronized void cacheLocation(final TableName tableName, final ServerName source,
124    final HRegionLocation location) {
125    assert source != null;
126    byte[] startKey = location.getRegion().getStartKey();
127    ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
128    RegionLocations locations = new RegionLocations(new HRegionLocation[] { location });
129    RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
130    boolean isNewCacheEntry = (oldLocations == null);
131    if (isNewCacheEntry) {
132      if (LOG.isTraceEnabled()) {
133        LOG.trace("Cached location: " + location);
134      }
135      addToCachedServers(locations);
136      MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations);
137      return;
138    }
139
140    // If the server in cache sends us a redirect, assume it's always valid.
141    HRegionLocation oldLocation =
142      oldLocations.getRegionLocation(location.getRegion().getReplicaId());
143    boolean force = oldLocation != null && oldLocation.getServerName() != null
144      && oldLocation.getServerName().equals(source);
145
146    // For redirect if the number is equal to previous
147    // record, the most common case is that first the region was closed with seqNum, and then
148    // opened with the same seqNum; hence we will ignore the redirect.
149    // There are so many corner cases with various combinations of opens and closes that
150    // an additional counter on top of seqNum would be necessary to handle them all.
151    RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
152    if (oldLocations != updatedLocations) {
153      tableLocations.put(startKey, updatedLocations);
154      MetaCacheUtil.cleanProblematicOverlappedRegions(updatedLocations, tableLocations);
155      if (LOG.isTraceEnabled()) {
156        LOG.trace("Changed cached location to: " + location);
157      }
158      addToCachedServers(updatedLocations);
159    }
160  }
161
162  /**
163   * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to
164   * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be
165   * atomic.
166   * @param tableName The table name.
167   * @param locations the new locations
168   */
169  public synchronized void cacheLocation(final TableName tableName,
170    final RegionLocations locations) {
171    byte[] startKey = locations.getRegionLocation().getRegion().getStartKey();
172    ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
173    RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
174    boolean isNewCacheEntry = (oldLocation == null);
175    if (isNewCacheEntry) {
176      if (LOG.isTraceEnabled()) {
177        LOG.trace("Cached location: " + locations);
178      }
179      addToCachedServers(locations);
180      MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations);
181      return;
182    }
183
184    // merge old and new locations and add it to the cache
185    // Meta record might be stale - some (probably the same) server has closed the region
186    // with later seqNum and told us about the new location.
187    RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
188    tableLocations.put(startKey, mergedLocation);
189    MetaCacheUtil.cleanProblematicOverlappedRegions(mergedLocation, tableLocations);
190    if (LOG.isTraceEnabled()) {
191      LOG.trace("Merged cached locations: " + mergedLocation);
192    }
193    addToCachedServers(locations);
194  }
195
196  private void addToCachedServers(RegionLocations locations) {
197    for (HRegionLocation loc : locations.getRegionLocations()) {
198      if (loc != null) {
199        cachedServers.add(loc.getServerName());
200      }
201    }
202  }
203
204  /**
205   * Returns Map of cached locations for passed <code>tableName</code>.<br>
206   * Despite being Concurrent, writes to the map should be synchronized because we have cases where
207   * we need to make multiple updates atomically.
208   */
209  private ConcurrentNavigableMap<byte[], RegionLocations>
210    getTableLocations(final TableName tableName) {
211    // find the map of cached locations for this table
212    return computeIfAbsent(cachedRegionLocations, tableName,
213      () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR));
214  }
215
216  /**
217   * Check the region cache to see whether a region is cached yet or not.
218   * @param tableName tableName
219   * @param row       row
220   * @return Region cached or not.
221   */
222  public boolean isRegionCached(TableName tableName, final byte[] row) {
223    RegionLocations location = getCachedLocation(tableName, row);
224    return location != null;
225  }
226
227  /**
228   * Return the number of cached region for a table. It will only be called from a unit test.
229   */
230  public int getNumberOfCachedRegionLocations(final TableName tableName) {
231    Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
232    if (tableLocs == null) {
233      return 0;
234    }
235    int numRegions = 0;
236    for (RegionLocations tableLoc : tableLocs.values()) {
237      numRegions += tableLoc.numNonNullElements();
238    }
239    return numRegions;
240  }
241
242  /**
243   * Delete all cached entries. <br>
244   * Synchronized because of calls in cacheLocation which need to be executed atomically
245   */
246  public synchronized void clearCache() {
247    this.cachedRegionLocations.clear();
248    this.cachedServers.clear();
249  }
250
251  /**
252   * Delete all cached entries of a server. <br>
253   * Synchronized because of calls in cacheLocation which need to be executed atomically
254   */
255  public synchronized void clearCache(final ServerName serverName) {
256    // Prior to synchronizing this method, we used to do another check below while synchronizing
257    // on cachedServers. This is no longer necessary since we moved synchronization up.
258    // Prior reason:
259    // We block here, because if there is an error on a server, it's likely that multiple
260    // threads will get the error simultaneously. If there are hundreds of thousand of
261    // region location to check, it's better to do this only once. A better pattern would
262    // be to check if the server is dead when we get the region location.
263    if (!this.cachedServers.contains(serverName)) {
264      return;
265    }
266
267    boolean deletedSomething = false;
268    for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()) {
269      for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
270        RegionLocations regionLocations = e.getValue();
271        if (regionLocations != null) {
272          RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
273          if (updatedLocations != regionLocations) {
274            deletedSomething = true;
275            if (updatedLocations.isEmpty()) {
276              tableLocations.remove(e.getKey());
277            } else {
278              tableLocations.put(e.getKey(), updatedLocations);
279            }
280          }
281        }
282      }
283    }
284    this.cachedServers.remove(serverName);
285    if (deletedSomething) {
286      if (metrics != null) {
287        metrics.incrMetaCacheNumClearServer();
288      }
289      if (LOG.isTraceEnabled()) {
290        LOG.trace("Removed all cached region locations that map to " + serverName);
291      }
292    }
293  }
294
295  /**
296   * Delete a cached location, no matter what it is. Called when we were told to not use cache.<br>
297   * Synchronized because of calls in cacheLocation which need to be executed atomically
298   * @param tableName tableName
299   */
300  public synchronized void clearCache(final TableName tableName, final byte[] row) {
301    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
302
303    RegionLocations regionLocations = getCachedLocation(tableName, row);
304    if (regionLocations != null) {
305      byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
306      tableLocations.remove(startKey);
307      if (metrics != null) {
308        metrics.incrMetaCacheNumClearRegion();
309      }
310      if (LOG.isTraceEnabled()) {
311        LOG.trace("Removed " + regionLocations + " from cache");
312      }
313    }
314  }
315
316  /**
317   * Delete all cached entries of a table.<br>
318   * Synchronized because of calls in cacheLocation which need to be executed atomically
319   */
320  public synchronized void clearCache(final TableName tableName) {
321    if (LOG.isTraceEnabled()) {
322      LOG.trace("Removed all cached region locations for table " + tableName);
323    }
324    this.cachedRegionLocations.remove(tableName);
325  }
326
327  /**
328   * Delete a cached location with specific replicaId.<br>
329   * Synchronized because of calls in cacheLocation which need to be executed atomically
330   * @param tableName tableName
331   * @param row       row key
332   * @param replicaId region replica id
333   */
334  public synchronized void clearCache(final TableName tableName, final byte[] row, int replicaId) {
335    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
336
337    RegionLocations regionLocations = getCachedLocation(tableName, row);
338    if (regionLocations != null) {
339      HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
340      if (toBeRemoved != null) {
341        RegionLocations updatedLocations = regionLocations.remove(replicaId);
342        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
343        if (updatedLocations.isEmpty()) {
344          tableLocations.remove(startKey);
345        } else {
346          tableLocations.put(startKey, updatedLocations);
347        }
348
349        if (metrics != null) {
350          metrics.incrMetaCacheNumClearRegion();
351        }
352        if (LOG.isTraceEnabled()) {
353          LOG.trace("Removed " + toBeRemoved + " from cache");
354        }
355      }
356    }
357  }
358
359  /**
360   * Delete a cached location for a table, row and server. <br>
361   * Synchronized because of calls in cacheLocation which need to be executed atomically
362   */
363  public synchronized void clearCache(final TableName tableName, final byte[] row,
364    ServerName serverName) {
365    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
366
367    RegionLocations regionLocations = getCachedLocation(tableName, row);
368    if (regionLocations != null) {
369      RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
370      if (updatedLocations != regionLocations) {
371        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
372        if (updatedLocations.isEmpty()) {
373          tableLocations.remove(startKey);
374        } else {
375          tableLocations.put(startKey, updatedLocations);
376        }
377        if (metrics != null) {
378          metrics.incrMetaCacheNumClearRegion();
379        }
380        if (LOG.isTraceEnabled()) {
381          LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
382            + " mapping to server: " + serverName + " from cache");
383        }
384      }
385    }
386  }
387
388  /**
389   * Deletes the cached location of the region if necessary, based on some error from source.<br>
390   * Synchronized because of calls in cacheLocation which need to be executed atomically
391   * @param hri The region in question.
392   */
393  public synchronized void clearCache(RegionInfo hri) {
394    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
395    RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
396    if (regionLocations != null) {
397      HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
398      if (oldLocation == null) return;
399      RegionLocations updatedLocations = regionLocations.remove(oldLocation);
400      if (updatedLocations != regionLocations) {
401        if (updatedLocations.isEmpty()) {
402          tableLocations.remove(hri.getStartKey());
403        } else {
404          tableLocations.put(hri.getStartKey(), updatedLocations);
405        }
406        if (metrics != null) {
407          metrics.incrMetaCacheNumClearRegion();
408        }
409        if (LOG.isTraceEnabled()) {
410          LOG.trace("Removed " + oldLocation + " from cache");
411        }
412      }
413    }
414  }
415
416}