001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 023 024import java.util.Iterator; 025import java.util.Map; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ConcurrentNavigableMap; 029import java.util.concurrent.ConcurrentSkipListMap; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.function.IntSupplier; 032import org.apache.commons.lang3.builder.ToStringBuilder; 033import org.apache.commons.lang3.builder.ToStringStyle; 034import org.apache.hadoop.hbase.ChoreService; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045 046/** 047 * <p> 048 * CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load balancing 049 * algorithm. It maintains a stale location cache for each table. Whenever client looks up location, 050 * it first check if the row is the stale location cache. If yes, the location from catalog replica 051 * is stale, it will go to the primary region to look up update-to-date location; otherwise, it will 052 * randomly pick up a replica region or primary region for lookup. When clients receive 053 * RegionNotServedException from region servers, it will add these region locations to the stale 054 * location cache. The stale cache will be cleaned up periodically by a chore. 055 * </p> 056 * It follows a simple algorithm to choose a meta replica region (including primary meta) to go: 057 * <ol> 058 * <li>If there is no stale location entry for rows it looks up, it will randomly pick a meta 059 * replica region (including primary meta) to do lookup.</li> 060 * <li>If the location from the replica region is stale, client gets RegionNotServedException from 061 * region server, in this case, it will create StaleLocationCacheEntry in 062 * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li> 063 * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it tries 064 * to lookup, if entry exists, it will go with primary meta region to do lookup; otherwise, it will 065 * follow step 1.</li> 066 * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li> 067 * </ol> 068 */ 069class CatalogReplicaLoadBalanceSimpleSelector 070 implements CatalogReplicaLoadBalanceSelector, Stoppable { 071 private static final Logger LOG = 072 LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class); 073 private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds 074 private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds 075 private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute 076 077 /** 078 * StaleLocationCacheEntry is the entry when a stale location is reported by an client. 079 */ 080 private static final class StaleLocationCacheEntry { 081 // timestamp in milliseconds 082 private final long timestamp; 083 084 private final byte[] endKey; 085 086 StaleLocationCacheEntry(final byte[] endKey) { 087 this.endKey = endKey; 088 timestamp = EnvironmentEdgeManager.currentTime(); 089 } 090 091 public byte[] getEndKey() { 092 return this.endKey; 093 } 094 095 public long getTimestamp() { 096 return this.timestamp; 097 } 098 099 @Override 100 public String toString() { 101 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("endKey", endKey) 102 .append("timestamp", timestamp).toString(); 103 } 104 } 105 106 private final ConcurrentMap<TableName, 107 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> staleCache = new ConcurrentHashMap<>(); 108 private volatile int numOfReplicas; 109 private final ChoreService choreService; 110 private final TableName tableName; 111 private final IntSupplier getNumOfReplicas; 112 private volatile boolean isStopped = false; 113 114 CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService, 115 IntSupplier getNumOfReplicas) { 116 this.choreService = choreService; 117 this.tableName = tableName; 118 this.getNumOfReplicas = getNumOfReplicas; 119 120 // This numOfReplicas is going to be lazy initialized. 121 this.numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 122 // Start chores 123 this.choreService.scheduleChore(getCacheCleanupChore(this)); 124 this.choreService.scheduleChore(getRefreshReplicaCountChore(this)); 125 } 126 127 /** 128 * When a client runs into RegionNotServingException, it will call this method to update 129 * Selector's internal state. 130 * @param loc the location which causes exception. 131 */ 132 @Override 133 public void onError(HRegionLocation loc) { 134 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = computeIfAbsent(staleCache, 135 loc.getRegion().getTable(), () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR)); 136 byte[] startKey = loc.getRegion().getStartKey(); 137 tableCache.putIfAbsent(startKey, new StaleLocationCacheEntry(loc.getRegion().getEndKey())); 138 LOG.debug("Add entry to stale cache for table {} with startKey {}, {}", 139 loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey()); 140 } 141 142 /** 143 * Select an random replica id (including the primary replica id). In case there is no replica 144 * region configured, return the primary replica id. 145 * @return Replica id 146 */ 147 private int getRandomReplicaId() { 148 int cachedNumOfReplicas = this.numOfReplicas; 149 if (cachedNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) { 150 cachedNumOfReplicas = refreshCatalogReplicaCount(); 151 this.numOfReplicas = cachedNumOfReplicas; 152 } 153 // In case of no replica configured, return the primary region id. 154 if (cachedNumOfReplicas <= 1) { 155 return RegionInfo.DEFAULT_REPLICA_ID; 156 } 157 return ThreadLocalRandom.current().nextInt(cachedNumOfReplicas); 158 } 159 160 /** 161 * When it looks up a location, it will call this method to find a replica region to go. For a 162 * normal case, > 99% of region locations from catalog/meta replica will be up to date. In extreme 163 * cases such as region server crashes, it will depends on how fast replication catches up. 164 * @param tableName table name it looks up 165 * @param row key it looks up. 166 * @param locateType locateType, Only BEFORE and CURRENT will be passed in. 167 * @return catalog replica id 168 */ 169 @Override 170 public int select(final TableName tableName, final byte[] row, 171 final RegionLocateType locateType) { 172 Preconditions.checkArgument( 173 locateType == RegionLocateType.BEFORE || locateType == RegionLocateType.CURRENT, 174 "Expected type BEFORE or CURRENT but got: %s", locateType); 175 176 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tableName); 177 178 // If there is no entry in StaleCache, select a random replica id. 179 if (tableCache == null) { 180 return getRandomReplicaId(); 181 } 182 183 Map.Entry<byte[], StaleLocationCacheEntry> entry; 184 boolean isEmptyStopRow = isEmptyStopRow(row); 185 // Only BEFORE and CURRENT are passed in. 186 if (locateType == RegionLocateType.BEFORE) { 187 entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row); 188 } else { 189 entry = tableCache.floorEntry(row); 190 } 191 192 // It is not in the stale cache, return a random replica id. 193 if (entry == null) { 194 return getRandomReplicaId(); 195 } 196 197 // The entry here is a possible match for the location. Check if the entry times out first as 198 // long comparing is faster than comparing byte arrays(in most cases). It could remove 199 // stale entries faster. If the possible match entry does not time out, it will check if 200 // the entry is a match for the row passed in and select the replica id accordingly. 201 if ( 202 (EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) 203 >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS 204 ) { 205 LOG.debug("Entry for table {} with startKey {}, {} times out", tableName, entry.getKey(), 206 entry); 207 tableCache.remove(entry.getKey()); 208 return getRandomReplicaId(); 209 } 210 211 byte[] endKey = entry.getValue().getEndKey(); 212 213 // The following logic is borrowed from AsyncNonMetaRegionLocator. 214 if (isEmptyStopRow(endKey)) { 215 LOG.debug("Lookup {} goes to primary region", row); 216 return RegionInfo.DEFAULT_REPLICA_ID; 217 } 218 219 if (locateType == RegionLocateType.BEFORE) { 220 if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) { 221 LOG.debug("Lookup {} goes to primary meta", row); 222 return RegionInfo.DEFAULT_REPLICA_ID; 223 } 224 } else { 225 if (Bytes.compareTo(row, endKey) < 0) { 226 LOG.debug("Lookup {} goes to primary meta", row); 227 return RegionInfo.DEFAULT_REPLICA_ID; 228 } 229 } 230 231 // Not in stale cache, return a random replica id. 232 return getRandomReplicaId(); 233 } 234 235 // This class implements the Stoppable interface as chores needs a Stopable object, there is 236 // no-op on this Stoppable object currently. 237 @Override 238 public void stop(String why) { 239 isStopped = true; 240 } 241 242 @Override 243 public boolean isStopped() { 244 return isStopped; 245 } 246 247 private void cleanupReplicaReplicaStaleCache() { 248 long curTimeInMills = EnvironmentEdgeManager.currentTime(); 249 for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) { 250 Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = tableCache.entrySet().iterator(); 251 while (it.hasNext()) { 252 Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next(); 253 if ( 254 curTimeInMills - entry.getValue().getTimestamp() >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS 255 ) { 256 LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue()); 257 it.remove(); 258 } 259 } 260 } 261 } 262 263 private int refreshCatalogReplicaCount() { 264 int newNumOfReplicas = this.getNumOfReplicas.getAsInt(); 265 LOG.debug("Refreshed replica count {}", newNumOfReplicas); 266 // If the returned number of replicas is -1, it is caused by failure to fetch the 267 // replica count. Do not update the numOfReplicas in this case. 268 if (newNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) { 269 LOG.error("Failed to fetch Table {}'s region replica count", tableName); 270 return this.numOfReplicas; 271 } 272 273 int cachedNumOfReplicas = this.numOfReplicas; 274 if ( 275 (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) 276 || (cachedNumOfReplicas != newNumOfReplicas) 277 ) { 278 this.numOfReplicas = newNumOfReplicas; 279 } 280 return newNumOfReplicas; 281 } 282 283 private ScheduledChore 284 getCacheCleanupChore(final CatalogReplicaLoadBalanceSimpleSelector selector) { 285 return new ScheduledChore("CleanupCatalogReplicaStaleCache", this, 286 STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) { 287 @Override 288 protected void chore() { 289 selector.cleanupReplicaReplicaStaleCache(); 290 } 291 }; 292 } 293 294 private ScheduledChore 295 getRefreshReplicaCountChore(final CatalogReplicaLoadBalanceSimpleSelector selector) { 296 return new ScheduledChore("RefreshReplicaCountChore", this, 297 REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) { 298 @Override 299 protected void chore() { 300 selector.refreshCatalogReplicaCount(); 301 } 302 }; 303 } 304}