001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.EnumSet; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.DelayQueue; 030import java.util.concurrent.Delayed; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.util.FutureUtils; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.KeeperException.Code; 040import org.apache.zookeeper.ZooKeeper; 041import org.apache.zookeeper.client.ZKClientConfig; 042import org.apache.zookeeper.data.Stat; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A very simple read only zookeeper implementation without watcher support. 048 */ 049@InterfaceAudience.Private 050public final class ReadOnlyZKClient implements Closeable { 051 052 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class); 053 054 public static final String RECOVERY_RETRY = "zookeeper.recovery.retry"; 055 056 private static final int DEFAULT_RECOVERY_RETRY = 30; 057 058 public static final String RECOVERY_RETRY_INTERVAL_MILLIS = 059 "zookeeper.recovery.retry.intervalmill"; 060 061 private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000; 062 063 public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time"; 064 065 private static final int DEFAULT_KEEPALIVE_MILLIS = 60000; 066 067 private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED); 068 069 private final String connectString; 070 071 private final int sessionTimeoutMs; 072 073 private final int maxRetries; 074 075 private final int retryIntervalMs; 076 077 private final int keepAliveTimeMs; 078 079 private final ZKClientConfig zkClientConfig; 080 081 private static abstract class Task implements Delayed { 082 083 protected long time = System.nanoTime(); 084 085 public boolean needZk() { 086 return false; 087 } 088 089 public void exec(ZooKeeper zk) { 090 } 091 092 public void connectFailed(Exception e) { 093 } 094 095 public void closed(IOException e) { 096 } 097 098 @Override 099 public int compareTo(Delayed o) { 100 Task that = (Task) o; 101 int c = Long.compare(time, that.time); 102 if (c != 0) { 103 return c; 104 } 105 return Integer.compare(System.identityHashCode(this), System.identityHashCode(that)); 106 } 107 108 @Override 109 public long getDelay(TimeUnit unit) { 110 return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); 111 } 112 } 113 114 private static final Task CLOSE = new Task() { 115 }; 116 117 private final DelayQueue<Task> tasks = new DelayQueue<>(); 118 119 private final AtomicBoolean closed = new AtomicBoolean(false); 120 121 ZooKeeper zookeeper; 122 123 private int pendingRequests = 0; 124 125 private String getId() { 126 return String.format("0x%08x", System.identityHashCode(this)); 127 } 128 129 public ReadOnlyZKClient(Configuration conf) { 130 // We might use a different ZK for client access 131 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 132 if (clientZkQuorumServers != null) { 133 this.connectString = clientZkQuorumServers; 134 } else { 135 this.connectString = ZKConfig.getZKQuorumServersString(conf); 136 } 137 this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); 138 this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); 139 this.retryIntervalMs = 140 conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); 141 this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); 142 this.zkClientConfig = ZKConfig.getZKClientConfig(conf); 143 LOG.debug( 144 "Connect {} to {} with session timeout={}ms, retries={}, " 145 + "retry interval={}ms, keepAlive={}ms, zk client config={}", 146 getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs, 147 zkClientConfig); 148 Threads.setDaemonThreadRunning(new Thread(this::run), 149 "ReadOnlyZKClient-" + connectString + "@" + getId()); 150 } 151 152 private abstract class ZKTask<T> extends Task { 153 154 protected final String path; 155 156 private final CompletableFuture<T> future; 157 158 private final String operationType; 159 160 private int retries; 161 162 protected ZKTask(String path, CompletableFuture<T> future, String operationType) { 163 this.path = path; 164 this.future = future; 165 this.operationType = operationType; 166 } 167 168 protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) { 169 tasks.add(new Task() { 170 171 @Override 172 public void exec(ZooKeeper alwaysNull) { 173 pendingRequests--; 174 Code code = Code.get(rc); 175 if (code == Code.OK) { 176 future.complete(ret); 177 } else if (code == Code.NONODE) { 178 if (errorIfNoNode) { 179 future.completeExceptionally(KeeperException.create(code, path)); 180 } else { 181 future.complete(ret); 182 } 183 } else if (FAIL_FAST_CODES.contains(code)) { 184 future.completeExceptionally(KeeperException.create(code, path)); 185 } else { 186 if (code == Code.SESSIONEXPIRED) { 187 LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); 188 try { 189 zk.close(); 190 } catch (InterruptedException e) { 191 // Restore interrupt status 192 Thread.currentThread().interrupt(); 193 } 194 } 195 if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { 196 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), 197 connectString, operationType, path, code, ZKTask.this.retries); 198 tasks.add(ZKTask.this); 199 } else { 200 LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), 201 connectString, operationType, path, code, ZKTask.this.retries); 202 future.completeExceptionally(KeeperException.create(code, path)); 203 } 204 } 205 } 206 207 @Override 208 public void closed(IOException e) { 209 // It may happen that a request is succeeded and the onComplete has been called and pushed 210 // us into the task queue, but before we get called a close is called and here we will 211 // fail the request, although it is succeeded actually. 212 // This is not a perfect solution but anyway, it is better than hang the requests for 213 // ever, and also acceptable as if you close the zk client before actually getting the 214 // response then a failure is always possible. 215 future.completeExceptionally(e); 216 } 217 }); 218 } 219 220 @Override 221 public boolean needZk() { 222 return true; 223 } 224 225 protected abstract void doExec(ZooKeeper zk); 226 227 @Override 228 public final void exec(ZooKeeper zk) { 229 pendingRequests++; 230 doExec(zk); 231 } 232 233 public boolean delay(long intervalMs, int maxRetries) { 234 if (retries >= maxRetries) { 235 return false; 236 } 237 retries++; 238 time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs); 239 return true; 240 } 241 242 @Override 243 public void connectFailed(Exception e) { 244 if (delay(retryIntervalMs, maxRetries)) { 245 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), 246 connectString, operationType, path, retries, e); 247 tasks.add(this); 248 } else { 249 LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), 250 connectString, operationType, path, retries, e); 251 future.completeExceptionally(e); 252 } 253 } 254 255 @Override 256 public void closed(IOException e) { 257 future.completeExceptionally(e); 258 } 259 } 260 261 public CompletableFuture<byte[]> get(String path) { 262 if (closed.get()) { 263 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 264 } 265 CompletableFuture<byte[]> future = new CompletableFuture<>(); 266 tasks.add(new ZKTask<byte[]>(path, future, "get") { 267 268 @Override 269 protected void doExec(ZooKeeper zk) { 270 zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), 271 null); 272 } 273 }); 274 return future; 275 } 276 277 public CompletableFuture<Stat> exists(String path) { 278 if (closed.get()) { 279 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 280 } 281 CompletableFuture<Stat> future = new CompletableFuture<>(); 282 tasks.add(new ZKTask<Stat>(path, future, "exists") { 283 284 @Override 285 protected void doExec(ZooKeeper zk) { 286 zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); 287 } 288 }); 289 return future; 290 } 291 292 public CompletableFuture<List<String>> list(String path) { 293 if (closed.get()) { 294 return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); 295 } 296 CompletableFuture<List<String>> future = new CompletableFuture<>(); 297 tasks.add(new ZKTask<List<String>>(path, future, "list") { 298 299 @Override 300 protected void doExec(ZooKeeper zk) { 301 zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true), 302 null); 303 } 304 }); 305 return future; 306 } 307 308 private void closeZk() { 309 if (zookeeper != null) { 310 try { 311 zookeeper.close(); 312 } catch (InterruptedException e) { 313 // Restore interrupt status 314 Thread.currentThread().interrupt(); 315 } 316 zookeeper = null; 317 } 318 } 319 320 private ZooKeeper getZk() throws IOException { 321 // may be closed when session expired 322 if (zookeeper == null || !zookeeper.getState().isAlive()) { 323 zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> { 324 }, zkClientConfig); 325 } 326 return zookeeper; 327 } 328 329 private void run() { 330 for (;;) { 331 Task task; 332 try { 333 task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS); 334 } catch (InterruptedException e) { 335 continue; 336 } 337 if (task == CLOSE) { 338 break; 339 } 340 if (task == null) { 341 if (pendingRequests == 0) { 342 LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", 343 getId(), connectString, keepAliveTimeMs); 344 closeZk(); 345 } 346 continue; 347 } 348 if (!task.needZk()) { 349 task.exec(null); 350 } else { 351 ZooKeeper zk; 352 try { 353 zk = getZk(); 354 } catch (Exception e) { 355 task.connectFailed(e); 356 continue; 357 } 358 task.exec(zk); 359 } 360 } 361 closeZk(); 362 DoNotRetryIOException error = new DoNotRetryIOException("Client already closed"); 363 Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error)); 364 tasks.clear(); 365 } 366 367 @Override 368 public void close() { 369 if (closed.compareAndSet(false, true)) { 370 LOG.debug("Close zookeeper connection {} to {}", getId(), connectString); 371 tasks.add(CLOSE); 372 } 373 } 374 375 public String getConnectString() { 376 return connectString; 377 } 378}