001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 028 029import java.io.IOException; 030import java.util.Collection; 031import java.util.List; 032import java.util.Map; 033import java.util.TreeMap; 034import java.util.concurrent.CompletableFuture; 035import java.util.stream.Collectors; 036import org.apache.commons.lang3.mutable.MutableInt; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterId; 039import org.apache.hadoop.hbase.HBaseInterfaceAudience; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.exceptions.DeserializationException; 044import org.apache.hadoop.hbase.master.RegionState; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 048import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 055 056/** 057 * Zookeeper based registry implementation. 058 * @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default 059 * connection mechanism as of 3.0.0. Expected to be removed in 4.0.0. 060 * @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent 061 * ticket for details. 062 */ 063@Deprecated 064@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 065class ZKConnectionRegistry implements ConnectionRegistry { 066 067 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 068 069 private static final Object WARN_LOCK = new Object(); 070 private static volatile boolean NEEDS_LOG_WARN = true; 071 072 private final ReadOnlyZKClient zk; 073 074 private final ZNodePaths znodePaths; 075 076 // User not used, but for rpc based registry we need it 077 ZKConnectionRegistry(Configuration conf, User ignored) { 078 this.znodePaths = new ZNodePaths(conf); 079 this.zk = new ReadOnlyZKClient(conf); 080 if (NEEDS_LOG_WARN) { 081 synchronized (WARN_LOCK) { 082 if (NEEDS_LOG_WARN) { 083 LOG.warn( 084 "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry"); 085 NEEDS_LOG_WARN = false; 086 } 087 } 088 } 089 } 090 091 private interface Converter<T> { 092 T convert(byte[] data) throws Exception; 093 } 094 095 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 096 CompletableFuture<T> future = new CompletableFuture<>(); 097 addListener(zk.get(path), (data, error) -> { 098 if (error != null) { 099 future.completeExceptionally(error); 100 return; 101 } 102 try { 103 future.complete(converter.convert(data)); 104 } catch (Exception e) { 105 future.completeExceptionally(e); 106 } 107 }); 108 return future; 109 } 110 111 private static String getClusterId(byte[] data) throws DeserializationException { 112 if (data == null || data.length == 0) { 113 return null; 114 } 115 data = removeMetaData(data); 116 return ClusterId.parseFrom(data).toString(); 117 } 118 119 @Override 120 public CompletableFuture<String> getClusterId() { 121 return tracedFuture( 122 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId), 123 "ZKConnectionRegistry.getClusterId"); 124 } 125 126 ReadOnlyZKClient getZKClient() { 127 return zk; 128 } 129 130 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 131 if (data == null || data.length == 0) { 132 return null; 133 } 134 data = removeMetaData(data); 135 int prefixLen = lengthOfPBMagic(); 136 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 137 data.length - prefixLen); 138 } 139 140 private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs, 141 CompletableFuture<RegionLocations> future) { 142 remaining.decrement(); 143 if (remaining.intValue() > 0) { 144 return; 145 } 146 future.complete(new RegionLocations(locs)); 147 } 148 149 private Pair<RegionState.State, ServerName> 150 getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) { 151 RegionState.State state; 152 if (proto.hasState()) { 153 state = RegionState.State.convert(proto.getState()); 154 } else { 155 state = RegionState.State.OPEN; 156 } 157 HBaseProtos.ServerName snProto = proto.getServer(); 158 return Pair.newPair(state, 159 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 160 } 161 162 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 163 List<String> metaReplicaZNodes) { 164 if (metaReplicaZNodes.isEmpty()) { 165 future.completeExceptionally(new IOException("No meta znode available")); 166 } 167 // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have 168 // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing 169 // znode for replicaId '1'. This is a transient condition. Because of this we are careful 170 // accumulating locations. We use a Map so retries overwrite rather than aggregate and the 171 // Map sorts just to be kind to further processing. The Map will retain the discontinuity on 172 // replicaIds but on completion (of the future), the Map values are passed to the 173 // RegionLocations constructor which knows how to deal with discontinuities. 174 final Map<Integer, HRegionLocation> locs = new TreeMap<>(); 175 MutableInt remaining = new MutableInt(metaReplicaZNodes.size()); 176 for (String metaReplicaZNode : metaReplicaZNodes) { 177 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 178 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 179 if (replicaId == DEFAULT_REPLICA_ID) { 180 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 181 if (error != null) { 182 future.completeExceptionally(error); 183 return; 184 } 185 if (proto == null) { 186 future.completeExceptionally(new IOException("Meta znode is null")); 187 return; 188 } 189 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 190 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 191 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 192 } 193 locs.put(replicaId, new HRegionLocation( 194 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond())); 195 tryComplete(remaining, locs.values(), future); 196 }); 197 } else { 198 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 199 if (future.isDone()) { 200 return; 201 } 202 if (error != null) { 203 LOG.warn("Failed to fetch " + path, error); 204 locs.put(replicaId, null); 205 } else if (proto == null) { 206 LOG.warn("Meta znode for replica " + replicaId + " is null"); 207 locs.put(replicaId, null); 208 } else { 209 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 210 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 211 LOG.warn("Meta region for replica " + replicaId + " is in state " 212 + stateAndServerName.getFirst()); 213 locs.put(replicaId, null); 214 } else { 215 locs.put(replicaId, 216 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 217 stateAndServerName.getSecond())); 218 } 219 } 220 tryComplete(remaining, locs.values(), future); 221 }); 222 } 223 } 224 } 225 226 @Override 227 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 228 return tracedFuture(() -> { 229 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 230 addListener( 231 zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() 232 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 233 (metaReplicaZNodes, error) -> { 234 if (error != null) { 235 future.completeExceptionally(error); 236 return; 237 } 238 getMetaRegionLocation(future, metaReplicaZNodes); 239 }); 240 return future; 241 }, "ZKConnectionRegistry.getMetaRegionLocations"); 242 } 243 244 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 245 if (data == null || data.length == 0) { 246 return null; 247 } 248 data = removeMetaData(data); 249 int prefixLen = lengthOfPBMagic(); 250 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 251 } 252 253 @Override 254 public CompletableFuture<ServerName> getActiveMaster() { 255 return tracedFuture( 256 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 257 .thenApply(proto -> { 258 if (proto == null) { 259 return null; 260 } 261 HBaseProtos.ServerName snProto = proto.getMaster(); 262 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 263 snProto.getStartCode()); 264 }), 265 "ZKConnectionRegistry.getActiveMaster"); 266 } 267 268 @Override 269 public String getConnectionString() { 270 final String serverList = zk.getConnectString(); 271 final String baseZNode = znodePaths.baseZNode; 272 return serverList + ":" + baseZNode; 273 } 274 275 @Override 276 public void close() { 277 zk.close(); 278 } 279}