001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
028
029import java.io.IOException;
030import java.util.Collection;
031import java.util.List;
032import java.util.Map;
033import java.util.TreeMap;
034import java.util.concurrent.CompletableFuture;
035import java.util.stream.Collectors;
036import org.apache.commons.lang3.mutable.MutableInt;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterId;
039import org.apache.hadoop.hbase.HBaseInterfaceAudience;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.exceptions.DeserializationException;
044import org.apache.hadoop.hbase.master.RegionState;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
048import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
055
056/**
057 * Zookeeper based registry implementation.
058 * @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default
059 *             connection mechanism as of 3.0.0. Expected to be removed in 4.0.0.
060 * @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent
061 *      ticket for details.
062 */
063@Deprecated
064@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
065class ZKConnectionRegistry implements ConnectionRegistry {
066
067  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
068
069  private static final Object WARN_LOCK = new Object();
070  private static volatile boolean NEEDS_LOG_WARN = true;
071
072  private final ReadOnlyZKClient zk;
073
074  private final ZNodePaths znodePaths;
075
076  // User not used, but for rpc based registry we need it
077  ZKConnectionRegistry(Configuration conf, User ignored) {
078    this.znodePaths = new ZNodePaths(conf);
079    this.zk = new ReadOnlyZKClient(conf);
080    if (NEEDS_LOG_WARN) {
081      synchronized (WARN_LOCK) {
082        if (NEEDS_LOG_WARN) {
083          LOG.warn(
084            "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry");
085          NEEDS_LOG_WARN = false;
086        }
087      }
088    }
089  }
090
091  private interface Converter<T> {
092    T convert(byte[] data) throws Exception;
093  }
094
095  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
096    CompletableFuture<T> future = new CompletableFuture<>();
097    addListener(zk.get(path), (data, error) -> {
098      if (error != null) {
099        future.completeExceptionally(error);
100        return;
101      }
102      try {
103        future.complete(converter.convert(data));
104      } catch (Exception e) {
105        future.completeExceptionally(e);
106      }
107    });
108    return future;
109  }
110
111  private static String getClusterId(byte[] data) throws DeserializationException {
112    if (data == null || data.length == 0) {
113      return null;
114    }
115    data = removeMetaData(data);
116    return ClusterId.parseFrom(data).toString();
117  }
118
119  @Override
120  public CompletableFuture<String> getClusterId() {
121    return tracedFuture(
122      () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
123      "ZKConnectionRegistry.getClusterId");
124  }
125
126  ReadOnlyZKClient getZKClient() {
127    return zk;
128  }
129
130  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
131    if (data == null || data.length == 0) {
132      return null;
133    }
134    data = removeMetaData(data);
135    int prefixLen = lengthOfPBMagic();
136    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
137      data.length - prefixLen);
138  }
139
140  private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs,
141    CompletableFuture<RegionLocations> future) {
142    remaining.decrement();
143    if (remaining.intValue() > 0) {
144      return;
145    }
146    future.complete(new RegionLocations(locs));
147  }
148
149  private Pair<RegionState.State, ServerName>
150    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
151    RegionState.State state;
152    if (proto.hasState()) {
153      state = RegionState.State.convert(proto.getState());
154    } else {
155      state = RegionState.State.OPEN;
156    }
157    HBaseProtos.ServerName snProto = proto.getServer();
158    return Pair.newPair(state,
159      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
160  }
161
162  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
163    List<String> metaReplicaZNodes) {
164    if (metaReplicaZNodes.isEmpty()) {
165      future.completeExceptionally(new IOException("No meta znode available"));
166    }
167    // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have
168    // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing
169    // znode for replicaId '1'. This is a transient condition. Because of this we are careful
170    // accumulating locations. We use a Map so retries overwrite rather than aggregate and the
171    // Map sorts just to be kind to further processing. The Map will retain the discontinuity on
172    // replicaIds but on completion (of the future), the Map values are passed to the
173    // RegionLocations constructor which knows how to deal with discontinuities.
174    final Map<Integer, HRegionLocation> locs = new TreeMap<>();
175    MutableInt remaining = new MutableInt(metaReplicaZNodes.size());
176    for (String metaReplicaZNode : metaReplicaZNodes) {
177      int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
178      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
179      if (replicaId == DEFAULT_REPLICA_ID) {
180        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
181          if (error != null) {
182            future.completeExceptionally(error);
183            return;
184          }
185          if (proto == null) {
186            future.completeExceptionally(new IOException("Meta znode is null"));
187            return;
188          }
189          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
190          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
191            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
192          }
193          locs.put(replicaId, new HRegionLocation(
194            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()));
195          tryComplete(remaining, locs.values(), future);
196        });
197      } else {
198        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
199          if (future.isDone()) {
200            return;
201          }
202          if (error != null) {
203            LOG.warn("Failed to fetch " + path, error);
204            locs.put(replicaId, null);
205          } else if (proto == null) {
206            LOG.warn("Meta znode for replica " + replicaId + " is null");
207            locs.put(replicaId, null);
208          } else {
209            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
210            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
211              LOG.warn("Meta region for replica " + replicaId + " is in state "
212                + stateAndServerName.getFirst());
213              locs.put(replicaId, null);
214            } else {
215              locs.put(replicaId,
216                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
217                  stateAndServerName.getSecond()));
218            }
219          }
220          tryComplete(remaining, locs.values(), future);
221        });
222      }
223    }
224  }
225
226  @Override
227  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
228    return tracedFuture(() -> {
229      CompletableFuture<RegionLocations> future = new CompletableFuture<>();
230      addListener(
231        zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
232          .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
233        (metaReplicaZNodes, error) -> {
234          if (error != null) {
235            future.completeExceptionally(error);
236            return;
237          }
238          getMetaRegionLocation(future, metaReplicaZNodes);
239        });
240      return future;
241    }, "ZKConnectionRegistry.getMetaRegionLocations");
242  }
243
244  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
245    if (data == null || data.length == 0) {
246      return null;
247    }
248    data = removeMetaData(data);
249    int prefixLen = lengthOfPBMagic();
250    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
251  }
252
253  @Override
254  public CompletableFuture<ServerName> getActiveMaster() {
255    return tracedFuture(
256      () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
257        .thenApply(proto -> {
258          if (proto == null) {
259            return null;
260          }
261          HBaseProtos.ServerName snProto = proto.getMaster();
262          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
263            snProto.getStartCode());
264        }),
265      "ZKConnectionRegistry.getActiveMaster");
266  }
267
268  @Override
269  public String getConnectionString() {
270    final String serverList = zk.getConnectString();
271    final String baseZNode = znodePaths.baseZNode;
272    return serverList + ":" + baseZNode;
273  }
274
275  @Override
276  public void close() {
277    zk.close();
278  }
279}