001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
028
029import java.io.IOException;
030import java.util.List;
031import java.util.concurrent.CompletableFuture;
032import java.util.stream.Collectors;
033import org.apache.commons.lang3.mutable.MutableInt;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterId;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.exceptions.DeserializationException;
041import org.apache.hadoop.hbase.master.RegionState;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
045import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
052
053/**
054 * Zookeeper based registry implementation.
055 * @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default
056 *             connection mechanism as of 3.0.0. Expected to be removed in 4.0.0.
057 * @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent
058 *      ticket for details.
059 */
060@Deprecated
061@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
062class ZKConnectionRegistry implements ConnectionRegistry {
063
064  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
065
066  private static final Object WARN_LOCK = new Object();
067  private static volatile boolean NEEDS_LOG_WARN = true;
068
069  private final ReadOnlyZKClient zk;
070
071  private final ZNodePaths znodePaths;
072  private final Configuration conf;
073  private final int zkRegistryAsyncTimeout;
074  public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout";
075  public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 10000; // 10 sec
076  // User not used, but for rpc based registry we need it
077
078  ZKConnectionRegistry(Configuration conf, User ignored) {
079    this.znodePaths = new ZNodePaths(conf);
080    this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER);
081    this.conf = conf;
082    this.zkRegistryAsyncTimeout =
083      conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT);
084    if (NEEDS_LOG_WARN) {
085      synchronized (WARN_LOCK) {
086        if (NEEDS_LOG_WARN) {
087          LOG.warn(
088            "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry");
089          NEEDS_LOG_WARN = false;
090        }
091      }
092    }
093  }
094
095  private interface Converter<T> {
096    T convert(byte[] data) throws Exception;
097  }
098
099  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
100    CompletableFuture<T> future = new CompletableFuture<>();
101    addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> {
102      if (error != null) {
103        future.completeExceptionally(error);
104        return;
105      }
106      try {
107        future.complete(converter.convert(data));
108      } catch (Exception e) {
109        future.completeExceptionally(e);
110      }
111    });
112    return future;
113  }
114
115  private static String getClusterId(byte[] data) throws DeserializationException {
116    if (data == null || data.length == 0) {
117      return null;
118    }
119    data = removeMetaData(data);
120    return ClusterId.parseFrom(data).toString();
121  }
122
123  @Override
124  public CompletableFuture<String> getClusterId() {
125    return tracedFuture(
126      () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
127      "ZKConnectionRegistry.getClusterId");
128  }
129
130  ReadOnlyZKClient getZKClient() {
131    return zk;
132  }
133
134  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
135    if (data == null || data.length == 0) {
136      return null;
137    }
138    data = removeMetaData(data);
139    int prefixLen = lengthOfPBMagic();
140    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
141      data.length - prefixLen);
142  }
143
144  private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
145    CompletableFuture<RegionLocations> future) {
146    remaining.decrement();
147    if (remaining.intValue() > 0) {
148      return;
149    }
150    future.complete(new RegionLocations(locs));
151  }
152
153  private Pair<RegionState.State, ServerName>
154    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
155    RegionState.State state;
156    if (proto.hasState()) {
157      state = RegionState.State.convert(proto.getState());
158    } else {
159      state = RegionState.State.OPEN;
160    }
161    HBaseProtos.ServerName snProto = proto.getServer();
162    return Pair.newPair(state,
163      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
164  }
165
166  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
167    List<String> metaReplicaZNodes) {
168    if (metaReplicaZNodes.isEmpty()) {
169      future.completeExceptionally(new IOException("No meta znode available"));
170    }
171    HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
172    MutableInt remaining = new MutableInt(locs.length);
173    for (String metaReplicaZNode : metaReplicaZNodes) {
174      int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
175      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
176      if (replicaId == DEFAULT_REPLICA_ID) {
177        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
178          if (error != null) {
179            future.completeExceptionally(error);
180            return;
181          }
182          if (proto == null) {
183            future.completeExceptionally(new IOException("Meta znode is null"));
184            return;
185          }
186          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
187          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
188            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
189          }
190          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
191            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
192          tryComplete(remaining, locs, future);
193        });
194      } else {
195        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
196          if (future.isDone()) {
197            return;
198          }
199          if (error != null) {
200            LOG.warn("Failed to fetch " + path, error);
201            locs[replicaId] = null;
202          } else if (proto == null) {
203            LOG.warn("Meta znode for replica " + replicaId + " is null");
204            locs[replicaId] = null;
205          } else {
206            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
207            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
208              LOG.warn("Meta region for replica " + replicaId + " is in state "
209                + stateAndServerName.getFirst());
210              locs[replicaId] = null;
211            } else {
212              locs[replicaId] =
213                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
214                  stateAndServerName.getSecond());
215            }
216          }
217          tryComplete(remaining, locs, future);
218        });
219      }
220    }
221  }
222
223  @Override
224  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
225    return tracedFuture(() -> {
226      CompletableFuture<RegionLocations> future = new CompletableFuture<>();
227      addListener(
228        zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children
229          .stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
230        (metaReplicaZNodes, error) -> {
231          if (error != null) {
232            future.completeExceptionally(error);
233            return;
234          }
235          getMetaRegionLocation(future, metaReplicaZNodes);
236        });
237      return future;
238    }, "ZKConnectionRegistry.getMetaRegionLocations");
239  }
240
241  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
242    if (data == null || data.length == 0) {
243      return null;
244    }
245    data = removeMetaData(data);
246    int prefixLen = lengthOfPBMagic();
247    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
248  }
249
250  @Override
251  public CompletableFuture<ServerName> getActiveMaster() {
252    return tracedFuture(
253      () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
254        .thenApply(proto -> {
255          if (proto == null) {
256            return null;
257          }
258          HBaseProtos.ServerName snProto = proto.getMaster();
259          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
260            snProto.getStartCode());
261        }),
262      "ZKConnectionRegistry.getActiveMaster");
263  }
264
265  @Override
266  public String getConnectionString() {
267    final String serverList = zk.getConnectString();
268    final String baseZNode = znodePaths.baseZNode;
269    return serverList + ":" + baseZNode;
270  }
271
272  @Override
273  public void close() {
274    zk.close();
275  }
276}