001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Optional; 023import java.util.concurrent.ConcurrentNavigableMap; 024import java.util.concurrent.ThreadFactory; 025import org.apache.hadoop.hbase.exceptions.DeserializationException; 026import org.apache.hadoop.hbase.master.RegionState; 027import org.apache.hadoop.hbase.trace.TraceUtil; 028import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; 029import org.apache.hadoop.hbase.util.RetryCounter; 030import org.apache.hadoop.hbase.util.RetryCounterFactory; 031import org.apache.hadoop.hbase.zookeeper.ZKListener; 032import org.apache.hadoop.hbase.zookeeper.ZKUtil; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.KeeperException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041 042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 043 044/** 045 * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta 046 * table znodes. Clients are expected to retry if the meta information is stale. This class is 047 * thread-safe (a single instance of this class can be shared by multiple threads without race 048 * conditions). 049 */ 050@InterfaceAudience.Private 051public class MetaRegionLocationCache extends ZKListener { 052 053 private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); 054 055 /** 056 * Maximum number of times we retry when ZK operation times out. 057 */ 058 private static final int MAX_ZK_META_FETCH_RETRIES = 10; 059 /** 060 * Sleep interval ms between ZK operation retries. 061 */ 062 private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000; 063 private static final int SLEEP_INTERVAL_MS_MAX = 10000; 064 private final RetryCounterFactory retryCounterFactory = 065 new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES); 066 067 /** 068 * Cached meta region locations indexed by replica ID. CopyOnWriteArrayMap ensures synchronization 069 * during updates and a consistent snapshot during client requests. Even though 070 * CopyOnWriteArrayMap copies the data structure for every write, that should be OK since the size 071 * of the list is often small and mutations are not too often and we do not need to block client 072 * requests while mutations are in progress. 073 */ 074 private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations; 075 076 private enum ZNodeOpType { 077 INIT, 078 CREATED, 079 CHANGED, 080 DELETED 081 } 082 083 public MetaRegionLocationCache(ZKWatcher zkWatcher) { 084 super(zkWatcher); 085 cachedMetaLocations = new CopyOnWriteArrayMap<>(); 086 watcher.registerListener(this); 087 // Populate the initial snapshot of data from meta znodes. 088 // This is needed because stand-by masters can potentially start after the initial znode 089 // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers 090 // are established. Subsequent updates are handled by the registered listener. Also, this runs 091 // in a separate thread in the background to not block master init. 092 ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); 093 RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE, 094 SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX); 095 threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)) 096 .start(); 097 } 098 099 /** 100 * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers 101 * a watcher on base znode to check for any CREATE/DELETE events on the children. 102 * @param retryCounter controls the number of retries and sleep between retries. 103 */ 104 private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) { 105 TraceUtil.trace(() -> { 106 List<String> znodes = null; 107 while (retryCounter.shouldRetry()) { 108 try { 109 znodes = watcher.getMetaReplicaNodesAndWatchChildren(); 110 break; 111 } catch (KeeperException ke) { 112 LOG.debug("Error populating initial meta locations", ke); 113 if (!retryCounter.shouldRetry()) { 114 // Retries exhausted and watchers not set. This is not a desirable state since the cache 115 // could remain stale forever. Propagate the exception. 116 watcher.abort("Error populating meta locations", ke); 117 return; 118 } 119 try { 120 retryCounter.sleepUntilNextRetry(); 121 } catch (InterruptedException ie) { 122 LOG.error("Interrupted while loading meta locations from ZK", ie); 123 Thread.currentThread().interrupt(); 124 return; 125 } 126 } 127 } 128 if (znodes == null || znodes.isEmpty()) { 129 // No meta znodes exist at this point but we registered a watcher on the base znode to 130 // listen for updates. They will be handled via nodeChildrenChanged(). 131 return; 132 } 133 if (znodes.size() == cachedMetaLocations.size()) { 134 // No new meta znodes got added. 135 return; 136 } 137 for (String znode : znodes) { 138 String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); 139 updateMetaLocation(path, opType); 140 } 141 }, "MetaRegionLocationCache.loadMetaLocationsFromZk"); 142 } 143 144 /** 145 * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for future 146 * updates. 147 * @param replicaId ReplicaID of the region. 148 * @return HRegionLocation for the meta replica. 149 * @throws KeeperException if there is any issue fetching/parsing the serialized data. 150 */ 151 private HRegionLocation getMetaRegionLocation(int replicaId) throws KeeperException { 152 RegionState metaRegionState; 153 try { 154 byte[] data = 155 ZKUtil.getDataAndWatch(watcher, watcher.getZNodePaths().getZNodeForReplica(replicaId)); 156 metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId); 157 } catch (DeserializationException e) { 158 throw ZKUtil.convert(e); 159 } 160 return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName()); 161 } 162 163 private void updateMetaLocation(String path, ZNodeOpType opType) { 164 if (!isValidMetaPath(path)) { 165 return; 166 } 167 LOG.debug("Updating meta znode for path {}: {}", path, opType.name()); 168 int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); 169 RetryCounter retryCounter = retryCounterFactory.create(); 170 HRegionLocation location = null; 171 while (retryCounter.shouldRetry()) { 172 try { 173 if (opType == ZNodeOpType.DELETED) { 174 if (!ZKUtil.watchAndCheckExists(watcher, path)) { 175 // The path does not exist, we've set the watcher and we can break for now. 176 break; 177 } 178 // If it is a transient error and the node appears right away, we fetch the 179 // latest meta state. 180 } 181 location = getMetaRegionLocation(replicaId); 182 break; 183 } catch (KeeperException e) { 184 LOG.debug("Error getting meta location for path {}", path, e); 185 if (!retryCounter.shouldRetry()) { 186 LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e); 187 break; 188 } 189 try { 190 retryCounter.sleepUntilNextRetry(); 191 } catch (InterruptedException ie) { 192 Thread.currentThread().interrupt(); 193 return; 194 } 195 } 196 } 197 if (location == null) { 198 cachedMetaLocations.remove(replicaId); 199 return; 200 } 201 cachedMetaLocations.put(replicaId, location); 202 } 203 204 /** Returns Optional list of HRegionLocations for meta replica(s), null if the cache is empty. */ 205 public Optional<List<HRegionLocation>> getMetaRegionLocations() { 206 ConcurrentNavigableMap<Integer, HRegionLocation> snapshot = 207 cachedMetaLocations.tailMap(cachedMetaLocations.firstKey()); 208 if (snapshot.isEmpty()) { 209 // This could be possible if the master has not successfully initialized yet or meta region 210 // is stuck in some weird state. 211 return Optional.empty(); 212 } 213 List<HRegionLocation> result = new ArrayList<>(); 214 // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying 215 // ArrayValueCollection does not implement toArray(). 216 snapshot.values().forEach(location -> result.add(location)); 217 return Optional.of(result); 218 } 219 220 /** 221 * Helper to check if the given 'path' corresponds to a meta znode. This listener is only 222 * interested in changes to meta znodes. 223 */ 224 private boolean isValidMetaPath(String path) { 225 return watcher.getZNodePaths().isMetaZNodePath(path); 226 } 227 228 @Override 229 public void nodeCreated(String path) { 230 updateMetaLocation(path, ZNodeOpType.CREATED); 231 } 232 233 @Override 234 public void nodeDeleted(String path) { 235 updateMetaLocation(path, ZNodeOpType.DELETED); 236 } 237 238 @Override 239 public void nodeDataChanged(String path) { 240 updateMetaLocation(path, ZNodeOpType.CHANGED); 241 } 242 243 @Override 244 public void nodeChildrenChanged(String path) { 245 if (!path.equals(watcher.getZNodePaths().baseZNode)) { 246 return; 247 } 248 loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED); 249 } 250}