001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.EnumSet;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.Code;
040import org.apache.zookeeper.ZooKeeper;
041import org.apache.zookeeper.client.ZKClientConfig;
042import org.apache.zookeeper.data.Stat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
047import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
048
049/**
050 * A very simple read only zookeeper implementation without watcher support.
051 */
052@InterfaceAudience.Private
053public final class ReadOnlyZKClient implements Closeable {
054
055  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
056
057  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
058
059  private static final int DEFAULT_RECOVERY_RETRY = 30;
060
061  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
062    "zookeeper.recovery.retry.intervalmill";
063
064  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
065
066  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
067
068  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
069
070  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
071
072  private final String connectString;
073
074  private final int sessionTimeoutMs;
075
076  private final int maxRetries;
077
078  private final int retryIntervalMs;
079
080  private final int keepAliveTimeMs;
081
082  private HashedWheelTimer retryTimer;
083
084  private final ZKClientConfig zkClientConfig;
085
086  private static abstract class Task implements Delayed {
087
088    protected long time = System.nanoTime();
089
090    public boolean needZk() {
091      return false;
092    }
093
094    public void exec(ZooKeeper zk) {
095    }
096
097    public void connectFailed(Exception e) {
098    }
099
100    public void closed(IOException e) {
101    }
102
103    @Override
104    public int compareTo(Delayed o) {
105      Task that = (Task) o;
106      int c = Long.compare(time, that.time);
107      if (c != 0) {
108        return c;
109      }
110      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
111    }
112
113    @Override
114    public long getDelay(TimeUnit unit) {
115      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
116    }
117  }
118
119  private static final Task CLOSE = new Task() {
120  };
121
122  private final DelayQueue<Task> tasks = new DelayQueue<>();
123
124  private final AtomicBoolean closed = new AtomicBoolean(false);
125
126  ZooKeeper zookeeper;
127
128  private int pendingRequests = 0;
129
130  private String getId() {
131    return String.format("0x%08x", System.identityHashCode(this));
132  }
133
134  public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) {
135    // We might use a different ZK for client access
136    String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
137    if (clientZkQuorumServers != null) {
138      this.connectString = clientZkQuorumServers;
139    } else {
140      this.connectString = ZKConfig.getZKQuorumServersString(conf);
141    }
142    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
143    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
144    this.retryIntervalMs =
145      conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
146    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
147    this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
148    this.retryTimer = retryTimer;
149    LOG.debug(
150      "Connect {} to {} with session timeout={}ms, retries={}, "
151        + "retry interval={}ms, keepAlive={}ms, zk client config={}",
152      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs,
153      zkClientConfig);
154    Threads.setDaemonThreadRunning(new Thread(this::run),
155      "ReadOnlyZKClient-" + connectString + "@" + getId());
156  }
157
158  private abstract class ZKTask<T> extends Task {
159
160    protected final String path;
161
162    private final CompletableFuture<T> future;
163
164    private final String operationType;
165
166    private int retries;
167
168    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
169      this.path = path;
170      this.future = future;
171      this.operationType = operationType;
172    }
173
174    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
175      tasks.add(new Task() {
176
177        @Override
178        public void exec(ZooKeeper alwaysNull) {
179          pendingRequests--;
180          Code code = Code.get(rc);
181          if (code == Code.OK) {
182            future.complete(ret);
183          } else if (code == Code.NONODE) {
184            if (errorIfNoNode) {
185              future.completeExceptionally(KeeperException.create(code, path));
186            } else {
187              future.complete(ret);
188            }
189          } else if (FAIL_FAST_CODES.contains(code)) {
190            future.completeExceptionally(KeeperException.create(code, path));
191          } else {
192            if (code == Code.SESSIONEXPIRED) {
193              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
194              try {
195                zk.close();
196              } catch (InterruptedException e) {
197                // Restore interrupt status
198                Thread.currentThread().interrupt();
199              }
200            }
201            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
202              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
203                connectString, operationType, path, code, ZKTask.this.retries);
204              tasks.add(ZKTask.this);
205            } else {
206              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
207                connectString, operationType, path, code, ZKTask.this.retries);
208              future.completeExceptionally(KeeperException.create(code, path));
209            }
210          }
211        }
212
213        @Override
214        public void closed(IOException e) {
215          // It may happen that a request is succeeded and the onComplete has been called and pushed
216          // us into the task queue, but before we get called a close is called and here we will
217          // fail the request, although it is succeeded actually.
218          // This is not a perfect solution but anyway, it is better than hang the requests for
219          // ever, and also acceptable as if you close the zk client before actually getting the
220          // response then a failure is always possible.
221          future.completeExceptionally(e);
222        }
223      });
224    }
225
226    @Override
227    public boolean needZk() {
228      return true;
229    }
230
231    protected abstract void doExec(ZooKeeper zk);
232
233    @Override
234    public final void exec(ZooKeeper zk) {
235      pendingRequests++;
236      doExec(zk);
237    }
238
239    public boolean delay(long intervalMs, int maxRetries) {
240      if (retries >= maxRetries) {
241        return false;
242      }
243      retries++;
244      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
245      return true;
246    }
247
248    @Override
249    public void connectFailed(Exception e) {
250      if (delay(retryIntervalMs, maxRetries)) {
251        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
252          connectString, operationType, path, retries, e);
253        tasks.add(this);
254      } else {
255        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
256          connectString, operationType, path, retries, e);
257        future.completeExceptionally(e);
258      }
259    }
260
261    @Override
262    public void closed(IOException e) {
263      future.completeExceptionally(e);
264    }
265  }
266
267  private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture<?> future,
268    final String api) {
269    return timeout -> {
270      if (!future.isDone()) {
271        future.completeExceptionally(new DoNotRetryIOException(
272          "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms"));
273      }
274    };
275  }
276
277  public CompletableFuture<byte[]> get(final String path, final long timeoutMs) {
278    CompletableFuture<byte[]> future = get(path);
279    TimerTask timerTask = getTimerTask(timeoutMs, future, "GET");
280    retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
281    return future;
282  }
283
284  public CompletableFuture<byte[]> get(String path) {
285    if (closed.get()) {
286      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
287    }
288    CompletableFuture<byte[]> future = new CompletableFuture<>();
289    tasks.add(new ZKTask<byte[]>(path, future, "get") {
290
291      @Override
292      protected void doExec(ZooKeeper zk) {
293        zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true),
294          null);
295      }
296    });
297    return future;
298  }
299
300  public CompletableFuture<Stat> exists(String path, long timeoutMs) {
301    CompletableFuture<Stat> future = exists(path);
302    TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS");
303    retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
304    return future;
305  }
306
307  public CompletableFuture<Stat> exists(String path) {
308    if (closed.get()) {
309      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
310    }
311    CompletableFuture<Stat> future = new CompletableFuture<>();
312    tasks.add(new ZKTask<Stat>(path, future, "exists") {
313
314      @Override
315      protected void doExec(ZooKeeper zk) {
316        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
317      }
318    });
319    return future;
320  }
321
322  public CompletableFuture<List<String>> list(String path, long timeoutMs) {
323    CompletableFuture<List<String>> future = list(path);
324    TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST");
325    retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
326    return future;
327  }
328
329  public CompletableFuture<List<String>> list(String path) {
330    if (closed.get()) {
331      return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
332    }
333    CompletableFuture<List<String>> future = new CompletableFuture<>();
334    tasks.add(new ZKTask<List<String>>(path, future, "list") {
335
336      @Override
337      protected void doExec(ZooKeeper zk) {
338        zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
339          null);
340      }
341    });
342    return future;
343  }
344
345  private void closeZk() {
346    if (zookeeper != null) {
347      try {
348        zookeeper.close();
349      } catch (InterruptedException e) {
350        // Restore interrupt status
351        Thread.currentThread().interrupt();
352      }
353      zookeeper = null;
354    }
355  }
356
357  private ZooKeeper getZk() throws IOException {
358    // may be closed when session expired
359    if (zookeeper == null || !zookeeper.getState().isAlive()) {
360      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
361      }, zkClientConfig);
362    }
363    return zookeeper;
364  }
365
366  private void run() {
367    for (;;) {
368      Task task;
369      try {
370        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
371      } catch (InterruptedException e) {
372        continue;
373      }
374      if (task == CLOSE) {
375        break;
376      }
377      if (task == null) {
378        if (pendingRequests == 0) {
379          LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
380            getId(), connectString, keepAliveTimeMs);
381          closeZk();
382        }
383        continue;
384      }
385      if (!task.needZk()) {
386        task.exec(null);
387      } else {
388        ZooKeeper zk;
389        try {
390          zk = getZk();
391        } catch (Exception e) {
392          task.connectFailed(e);
393          continue;
394        }
395        task.exec(zk);
396      }
397    }
398    closeZk();
399    DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
400    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
401    tasks.clear();
402  }
403
404  @Override
405  public void close() {
406    if (closed.compareAndSet(false, true)) {
407      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
408      tasks.add(CLOSE);
409    }
410  }
411
412  public String getConnectString() {
413    return connectString;
414  }
415}