001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.AuthUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ZooKeeperConnectionException; 036import org.apache.hadoop.hbase.security.Superusers; 037import org.apache.hadoop.hbase.trace.TraceUtil; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.zookeeper.KeeperException; 043import org.apache.zookeeper.WatchedEvent; 044import org.apache.zookeeper.Watcher; 045import org.apache.zookeeper.ZooDefs.Ids; 046import org.apache.zookeeper.ZooDefs.Perms; 047import org.apache.zookeeper.data.ACL; 048import org.apache.zookeeper.data.Id; 049import org.apache.zookeeper.data.Stat; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated for each Master, 057 * RegionServer, and client process. 058 * <p> 059 * This is the only class that implements {@link Watcher}. Other internal classes which need to be 060 * notified of ZooKeeper events must register with the local instance of this watcher via 061 * {@link #registerListener}. 062 * <p> 063 * This class also holds and manages the connection to ZooKeeper. Code to deal with connection 064 * related events and exceptions are handled here. 065 */ 066@InterfaceAudience.Private 067public class ZKWatcher implements Watcher, Abortable, Closeable { 068 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 069 070 // Identifier for this watcher (for logging only). It is made of the prefix 071 // passed on construction and the zookeeper sessionid. 072 private final String prefix; 073 private String identifier; 074 075 // zookeeper quorum 076 private final String quorum; 077 078 // zookeeper connection 079 private final RecoverableZooKeeper recoverableZooKeeper; 080 081 // abortable in case of zk failure 082 protected Abortable abortable; 083 // Used if abortable is null 084 private boolean aborted = false; 085 086 private final ZNodePaths znodePaths; 087 088 // listeners to be notified 089 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 090 091 // Single threaded executor pool that processes event notifications from Zookeeper. Events are 092 // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do 093 // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. 094 // EventThread internally runs a single while loop to serially process all the events. When events 095 // are processed by the listeners in the same thread, that blocks the EventThread from processing 096 // subsequent events. Processing events in a separate thread frees up the event thread to continue 097 // and further prevents deadlocks if the process method itself makes other zookeeper calls. 098 // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the 099 // requests using a single while loop and hence there is no performance degradation. 100 private final ExecutorService zkEventProcessor = Executors 101 .newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d") 102 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 103 104 private final Configuration conf; 105 106 private final long zkSyncTimeout; 107 108 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 109 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 110 111 /** 112 * Instantiate a ZooKeeper connection and watcher. 113 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 114 * this instance. Use null for default. 115 * @throws IOException if the connection to ZooKeeper fails 116 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 117 */ 118 public ZKWatcher(Configuration conf, String identifier, Abortable abortable) 119 throws ZooKeeperConnectionException, IOException { 120 this(conf, identifier, abortable, false); 121 } 122 123 /** 124 * Instantiate a ZooKeeper connection and watcher. 125 * @param conf the configuration to use 126 * @param identifier string that is passed to RecoverableZookeeper to be used as 127 * identifier for this instance. Use null for default. 128 * @param abortable Can be null if there is on error there is no host to abort: e.g. 129 * client context. 130 * @param canCreateBaseZNode true if a base ZNode can be created 131 * @throws IOException if the connection to ZooKeeper fails 132 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 133 */ 134 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 135 boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { 136 this(conf, identifier, abortable, canCreateBaseZNode, false); 137 } 138 139 /** 140 * Instantiate a ZooKeeper connection and watcher. 141 * @param conf the configuration to use 142 * @param identifier string that is passed to RecoverableZookeeper to be used as 143 * identifier for this instance. Use null for default. 144 * @param abortable Can be null if there is on error there is no host to abort: e.g. 145 * client context. 146 * @param canCreateBaseZNode true if a base ZNode can be created 147 * @param clientZK whether this watcher is set to access client ZK 148 * @throws IOException if the connection to ZooKeeper fails 149 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 150 * ZNodes 151 */ 152 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 153 boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException { 154 this.conf = conf; 155 if (clientZK) { 156 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 157 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 158 if (clientZkQuorumServers != null) { 159 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 160 // Don't allow same settings to avoid dead loop when master trying 161 // to sync meta information from server ZK to client ZK 162 throw new IllegalArgumentException( 163 "The quorum settings for client ZK should be different from those for server"); 164 } 165 this.quorum = clientZkQuorumServers; 166 } else { 167 this.quorum = serverZkQuorumServers; 168 } 169 } else { 170 this.quorum = ZKConfig.getZKQuorumServersString(conf); 171 } 172 this.prefix = identifier; 173 // Identifier will get the sessionid appended later below down when we 174 // handle the syncconnect event. 175 this.identifier = identifier + "0x0"; 176 this.abortable = abortable; 177 this.znodePaths = new ZNodePaths(conf); 178 PendingWatcher pendingWatcher = new PendingWatcher(); 179 this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, 180 identifier, ZKConfig.getZKClientConfig(conf)); 181 pendingWatcher.prepare(this); 182 if (canCreateBaseZNode) { 183 try { 184 createBaseZNodes(); 185 } catch (ZooKeeperConnectionException zce) { 186 try { 187 this.recoverableZooKeeper.close(); 188 } catch (InterruptedException ie) { 189 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 190 Thread.currentThread().interrupt(); 191 } 192 throw zce; 193 } 194 } 195 this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, 196 HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); 197 } 198 199 public List<ACL> createACL(String node) { 200 return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration())); 201 } 202 203 public List<ACL> createACL(String node, boolean isSecureZooKeeper) { 204 if (!node.startsWith(getZNodePaths().baseZNode)) { 205 return Ids.OPEN_ACL_UNSAFE; 206 } 207 if (isSecureZooKeeper) { 208 ArrayList<ACL> acls = new ArrayList<>(); 209 // add permission to hbase supper user 210 String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY); 211 String hbaseUser = null; 212 try { 213 hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 214 } catch (IOException e) { 215 LOG.debug("Could not acquire current User.", e); 216 } 217 if (superUsers != null) { 218 List<String> groups = new ArrayList<>(); 219 for (String user : superUsers) { 220 if (AuthUtil.isGroupPrincipal(user)) { 221 // TODO: Set node ACL for groups when ZK supports this feature 222 groups.add(user); 223 } else { 224 if (!user.equals(hbaseUser)) { 225 acls.add(new ACL(Perms.ALL, new Id("sasl", user))); 226 } 227 } 228 } 229 if (!groups.isEmpty()) { 230 LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this " 231 + "feature presently.", groups); 232 } 233 } 234 // Certain znodes are accessed directly by the client, 235 // so they must be readable by non-authenticated clients 236 if (getZNodePaths().isClientReadable(node)) { 237 acls.addAll(Ids.CREATOR_ALL_ACL); 238 acls.addAll(Ids.READ_ACL_UNSAFE); 239 } else { 240 acls.addAll(Ids.CREATOR_ALL_ACL); 241 } 242 return acls; 243 } else { 244 return Ids.OPEN_ACL_UNSAFE; 245 } 246 } 247 248 private void createBaseZNodes() throws ZooKeeperConnectionException { 249 try { 250 // Create all the necessary "directories" of znodes 251 ZKUtil.createWithParents(this, znodePaths.baseZNode); 252 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 253 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 254 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 255 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 256 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 257 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 258 } catch (KeeperException e) { 259 throw new ZooKeeperConnectionException( 260 prefix("Unexpected KeeperException creating base node"), e); 261 } 262 } 263 264 /** 265 * On master start, we check the znode ACLs under the root directory and set the ACLs properly if 266 * needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed so 267 * that the existing znodes created with open permissions are now changed with restrictive perms. 268 */ 269 public void checkAndSetZNodeAcls() { 270 if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) { 271 LOG.info("not a secure deployment, proceeding"); 272 return; 273 } 274 275 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 276 // correct. 277 try { 278 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 279 280 if (!isBaseZnodeAclSetup(actualAcls)) { 281 LOG.info("setting znode ACLs"); 282 setZnodeAclsRecursive(znodePaths.baseZNode); 283 } 284 } catch (KeeperException.NoNodeException nne) { 285 return; 286 } catch (InterruptedException ie) { 287 interruptedExceptionNoThrow(ie, false); 288 } catch (IOException | KeeperException e) { 289 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 290 } 291 } 292 293 /** 294 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs will 295 * be set last in case the master fails in between. 296 * @param znode the ZNode to set the permissions for 297 */ 298 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 299 List<String> children = recoverableZooKeeper.getChildren(znode, false); 300 301 for (String child : children) { 302 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 303 } 304 List<ACL> acls = createACL(znode, true); 305 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 306 recoverableZooKeeper.setAcl(znode, acls, -1); 307 } 308 309 /** 310 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 311 * @param acls acls from zookeeper 312 * @return whether ACLs are set for the base znode 313 * @throws IOException if getting the current user fails 314 */ 315 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug("Checking znode ACLs"); 318 } 319 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 320 // Check whether ACL set for all superusers 321 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 322 return false; 323 } 324 325 // this assumes that current authenticated user is the same as zookeeper client user 326 // configured via JAAS 327 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 328 329 if (acls.isEmpty()) { 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("ACL is empty"); 332 } 333 return false; 334 } 335 336 for (ACL acl : acls) { 337 int perms = acl.getPerms(); 338 Id id = acl.getId(); 339 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 340 // and one for the hbase user 341 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 342 if (perms != Perms.READ) { 343 if (LOG.isDebugEnabled()) { 344 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 345 id, perms, Perms.READ)); 346 } 347 return false; 348 } 349 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 350 if (perms != Perms.ALL) { 351 if (LOG.isDebugEnabled()) { 352 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 353 id, perms, Perms.ALL)); 354 } 355 return false; 356 } 357 } else if ("sasl".equals(id.getScheme())) { 358 String name = id.getId(); 359 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 360 Matcher match = NAME_PATTERN.matcher(name); 361 if (match.matches()) { 362 name = match.group(1); 363 } 364 if (name.equals(hbaseUser)) { 365 if (perms != Perms.ALL) { 366 if (LOG.isDebugEnabled()) { 367 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 368 id, perms, Perms.ALL)); 369 } 370 return false; 371 } 372 } else { 373 if (LOG.isDebugEnabled()) { 374 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 375 } 376 return false; 377 } 378 } else { 379 if (LOG.isDebugEnabled()) { 380 LOG.debug("unexpected ACL id '{}'", id); 381 } 382 return false; 383 } 384 } 385 return true; 386 } 387 388 /* 389 * Validate whether ACL set for all superusers. 390 */ 391 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 392 for (String user : superUsers) { 393 boolean hasAccess = false; 394 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 395 if (!AuthUtil.isGroupPrincipal(user)) { 396 for (ACL acl : acls) { 397 if (user.equals(acl.getId().getId())) { 398 if (acl.getPerms() == Perms.ALL) { 399 hasAccess = true; 400 } else { 401 if (LOG.isDebugEnabled()) { 402 LOG.debug(String.format( 403 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 404 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 405 } 406 } 407 break; 408 } 409 } 410 if (!hasAccess) { 411 return false; 412 } 413 } 414 } 415 return true; 416 } 417 418 /* 419 * Validate whether ACL ID is superuser. 420 */ 421 public static boolean isSuperUserId(String[] superUsers, Id id) { 422 for (String user : superUsers) { 423 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 424 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 425 return true; 426 } 427 } 428 return false; 429 } 430 431 @Override 432 public String toString() { 433 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 434 } 435 436 /** 437 * Adds this instance's identifier as a prefix to the passed <code>str</code> 438 * @param str String to amend. 439 * @return A new string with this instance's identifier as prefix: e.g. if passed 'hello world', 440 * the returned string could be 441 */ 442 public String prefix(final String str) { 443 return this.toString() + " " + str; 444 } 445 446 /** 447 * Get the znodes corresponding to the meta replicas from ZK 448 * @return list of znodes 449 * @throws KeeperException if a ZooKeeper operation fails 450 */ 451 public List<String> getMetaReplicaNodes() throws KeeperException { 452 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 453 return filterMetaReplicaNodes(childrenOfBaseNode); 454 } 455 456 /** 457 * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode 458 * for subsequent CREATE/DELETE operations on child nodes. 459 */ 460 public List<String> getMetaReplicaNodesAndWatchChildren() throws KeeperException { 461 List<String> childrenOfBaseNode = 462 ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); 463 return filterMetaReplicaNodes(childrenOfBaseNode); 464 } 465 466 /** 467 * @param nodes Input list of znodes 468 * @return Filtered list of znodes from nodes that belong to meta replica(s). 469 */ 470 private List<String> filterMetaReplicaNodes(List<String> nodes) { 471 if (nodes == null || nodes.isEmpty()) { 472 return new ArrayList<>(); 473 } 474 List<String> metaReplicaNodes = new ArrayList<>(2); 475 String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); 476 for (String child : nodes) { 477 if (child.startsWith(pattern)) { 478 metaReplicaNodes.add(child); 479 } 480 } 481 return metaReplicaNodes; 482 } 483 484 /** 485 * Register the specified listener to receive ZooKeeper events. 486 * @param listener the listener to register 487 */ 488 public void registerListener(ZKListener listener) { 489 listeners.add(listener); 490 } 491 492 /** 493 * Register the specified listener to receive ZooKeeper events and add it as the first in the list 494 * of current listeners. 495 * @param listener the listener to register 496 */ 497 public void registerListenerFirst(ZKListener listener) { 498 listeners.add(0, listener); 499 } 500 501 public void unregisterListener(ZKListener listener) { 502 listeners.remove(listener); 503 } 504 505 /** 506 * Clean all existing listeners 507 */ 508 public void unregisterAllListeners() { 509 listeners.clear(); 510 } 511 512 /** 513 * Get a copy of current registered listeners 514 */ 515 public List<ZKListener> getListeners() { 516 return new ArrayList<>(listeners); 517 } 518 519 /** Returns The number of currently registered listeners */ 520 public int getNumberOfListeners() { 521 return listeners.size(); 522 } 523 524 /** 525 * Get the connection to ZooKeeper. 526 * @return connection reference to zookeeper 527 */ 528 public RecoverableZooKeeper getRecoverableZooKeeper() { 529 return recoverableZooKeeper; 530 } 531 532 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 533 recoverableZooKeeper.reconnectAfterExpiration(); 534 } 535 536 /** 537 * Get the quorum address of this instance. 538 * @return quorum string of this zookeeper connection instance 539 */ 540 public String getQuorum() { 541 return quorum; 542 } 543 544 /** 545 * Get the znodePaths. 546 * <p> 547 * Mainly used for mocking as mockito can not mock a field access. 548 */ 549 public ZNodePaths getZNodePaths() { 550 return znodePaths; 551 } 552 553 private void processEvent(WatchedEvent event) { 554 TraceUtil.trace(() -> { 555 switch (event.getType()) { 556 // If event type is NONE, this is a connection status change 557 case None: { 558 connectionEvent(event); 559 break; 560 } 561 562 // Otherwise pass along to the listeners 563 case NodeCreated: { 564 for (ZKListener listener : listeners) { 565 listener.nodeCreated(event.getPath()); 566 } 567 break; 568 } 569 570 case NodeDeleted: { 571 for (ZKListener listener : listeners) { 572 listener.nodeDeleted(event.getPath()); 573 } 574 break; 575 } 576 577 case NodeDataChanged: { 578 for (ZKListener listener : listeners) { 579 listener.nodeDataChanged(event.getPath()); 580 } 581 break; 582 } 583 584 case NodeChildrenChanged: { 585 for (ZKListener listener : listeners) { 586 listener.nodeChildrenChanged(event.getPath()); 587 } 588 break; 589 } 590 default: 591 LOG.error("Invalid event of type {} received for path {}. Ignoring.", event.getState(), 592 event.getPath()); 593 } 594 }, "ZKWatcher.processEvent: " + event.getType() + " " + event.getPath()); 595 } 596 597 /** 598 * Method called from ZooKeeper for events and connection status. 599 * <p> 600 * Valid events are passed along to listeners. Connection status changes are dealt with locally. 601 */ 602 @Override 603 public void process(WatchedEvent event) { 604 LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" 605 + event.getState() + ", " + "path=" + event.getPath())); 606 final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier; 607 if (!zkEventProcessor.isShutdown()) { 608 zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> processEvent(event), spanName)); 609 } 610 } 611 612 // Connection management 613 614 /** 615 * Called when there is a connection-related event via the Watcher callback. 616 * <p> 617 * If Disconnected or Expired, this should shutdown the cluster. But, since we send a 618 * KeeperException.SessionExpiredException along with the abort call, it's possible for the 619 * Abortable to catch it and try to create a new session with ZooKeeper. This is what the client 620 * does in HCM. 621 * <p> 622 * @param event the connection-related event 623 */ 624 private void connectionEvent(WatchedEvent event) { 625 switch (event.getState()) { 626 case SyncConnected: 627 this.identifier = 628 this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId()); 629 // Update our identifier. Otherwise ignore. 630 LOG.debug("{} connected", this.identifier); 631 break; 632 633 // Abort the server if Disconnected or Expired 634 case Disconnected: 635 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 636 break; 637 638 case Closed: 639 LOG.debug(prefix("ZooKeeper client closed, ignoring")); 640 break; 641 642 case Expired: 643 String msg = prefix(this.identifier + " received expired from " + "ZooKeeper, aborting"); 644 // TODO: One thought is to add call to ZKListener so say, 645 // ZKNodeTracker can zero out its data values. 646 if (this.abortable != null) { 647 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 648 } 649 break; 650 651 case ConnectedReadOnly: 652 case SaslAuthenticated: 653 case AuthFailed: 654 break; 655 656 default: 657 throw new IllegalStateException("Received event is not valid: " + event.getState()); 658 } 659 } 660 661 /** 662 * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a 663 * timeout lets the callers fail-fast rather than wait forever for the sync to finish. 664 * <p> 665 * Executing this method before running other methods will ensure that the subsequent operations 666 * are up-to-date and consistent as of the time that the sync is complete. 667 * <p> 668 * This is used for compareAndSwap type operations where we need to read the data of an existing 669 * node and delete or transition that node, utilizing the previously read version and data. We 670 * want to ensure that the version read is up-to-date from when we begin the operation. 671 * <p> 672 */ 673 public void syncOrTimeout(String path) throws KeeperException { 674 final CountDownLatch latch = new CountDownLatch(1); 675 long startTime = EnvironmentEdgeManager.currentTime(); 676 this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); 677 try { 678 if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { 679 LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " 680 + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); 681 throw new KeeperException.RequestTimeoutException(); 682 } 683 } catch (InterruptedException e) { 684 LOG.warn("Interrupted waiting for ZK sync() to finish.", e); 685 Thread.currentThread().interrupt(); 686 return; 687 } 688 if (LOG.isDebugEnabled()) { 689 // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a 690 // useful metric to have since the latency of sync() impacts the callers. 691 LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); 692 } 693 } 694 695 /** 696 * Handles KeeperExceptions in client calls. 697 * <p> 698 * This may be temporary but for now this gives one place to deal with these. 699 * <p> 700 * TODO: Currently this method rethrows the exception to let the caller handle 701 * <p> 702 * @param ke the exception to rethrow 703 * @throws KeeperException if a ZooKeeper operation fails 704 */ 705 public void keeperException(KeeperException ke) throws KeeperException { 706 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 707 throw ke; 708 } 709 710 /** 711 * Handles InterruptedExceptions in client calls. 712 * @param ie the InterruptedException instance thrown 713 * @throws KeeperException the exception to throw, transformed from the InterruptedException 714 */ 715 public void interruptedException(InterruptedException ie) throws KeeperException { 716 interruptedExceptionNoThrow(ie, true); 717 // Throw a system error exception to let upper level handle it 718 KeeperException keeperException = new KeeperException.SystemErrorException(); 719 keeperException.initCause(ie); 720 throw keeperException; 721 } 722 723 /** 724 * Log the InterruptedException and interrupt current thread 725 * @param ie The IterruptedException to log 726 * @param throwLater Whether we will throw the exception latter 727 */ 728 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 729 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 730 + (throwLater ? " and rethrow a SystemErrorException" : "")), ie); 731 // At least preserve interrupt. 732 Thread.currentThread().interrupt(); 733 } 734 735 /** 736 * Close the connection to ZooKeeper. 737 */ 738 @Override 739 public void close() { 740 zkEventProcessor.shutdown(); 741 try { 742 if (!zkEventProcessor.awaitTermination(15, TimeUnit.SECONDS)) { 743 LOG.warn("ZKWatcher event processor has not finished to terminate."); 744 zkEventProcessor.shutdownNow(); 745 } 746 } catch (InterruptedException e) { 747 Thread.currentThread().interrupt(); 748 } finally { 749 try { 750 recoverableZooKeeper.close(); 751 } catch (InterruptedException e) { 752 Thread.currentThread().interrupt(); 753 } 754 } 755 } 756 757 public Configuration getConfiguration() { 758 return conf; 759 } 760 761 @Override 762 public void abort(String why, Throwable e) { 763 if (this.abortable != null) { 764 this.abortable.abort(why, e); 765 } else { 766 this.aborted = true; 767 } 768 } 769 770 @Override 771 public boolean isAborted() { 772 return this.abortable == null ? this.aborted : this.abortable.isAborted(); 773 } 774}