001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.lang.management.ManagementFactory; 025import java.util.ArrayList; 026import java.util.LinkedList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.trace.TraceUtil; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.hadoop.hbase.util.RetryCounter; 034import org.apache.hadoop.hbase.util.RetryCounterFactory; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.AsyncCallback; 037import org.apache.zookeeper.CreateMode; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.Op; 040import org.apache.zookeeper.OpResult; 041import org.apache.zookeeper.Watcher; 042import org.apache.zookeeper.ZooDefs; 043import org.apache.zookeeper.ZooKeeper; 044import org.apache.zookeeper.ZooKeeper.States; 045import org.apache.zookeeper.client.ZKClientConfig; 046import org.apache.zookeeper.data.ACL; 047import org.apache.zookeeper.data.Stat; 048import org.apache.zookeeper.proto.CreateRequest; 049import org.apache.zookeeper.proto.SetDataRequest; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * A zookeeper that can handle 'recoverable' errors. 055 * <p> 056 * To handle recoverable errors, developers need to realize that there are two classes of requests: 057 * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are 058 * examples of idempotent requests, they can be reissued with the same results. 059 * <p> 060 * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is 061 * the same.) Non-idempotent requests need special handling, application and library writers need to 062 * keep in mind that they may need to encode information in the data or name of znodes to detect 063 * retries. A simple example is a create that uses a sequence flag. If a process issues a 064 * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue 065 * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 066 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result 067 * of the previous create, so the process actually owns both x-109 and x-111. An easy way around 068 * this is to use "x-process id-" when doing the create. If the process is using an id of 352, 069 * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 070 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it 071 * created is "x-352-109". 072 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" 073 */ 074@InterfaceAudience.Private 075public class RecoverableZooKeeper { 076 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 077 // the actual ZooKeeper client instance 078 private ZooKeeper zk; 079 private final RetryCounterFactory retryCounterFactory; 080 // An identifier of this process in the cluster 081 private final String identifier; 082 private final byte[] id; 083 private final Watcher watcher; 084 private final int sessionTimeout; 085 private final String quorumServers; 086 private final int maxMultiSize; 087 private final ZKClientConfig zkClientConfig; 088 089 /** 090 * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}. 091 */ 092 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) 093 throws IOException { 094 String ensemble = ZKConfig.getZKQuorumServersString(conf); 095 return connect(conf, ensemble, watcher, null, null); 096 } 097 098 /** 099 * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified 100 * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring 101 * watcher to the specified watcher. 102 * @param conf configuration to pull ensemble and other settings from 103 * @param watcher watcher to monitor connection changes 104 * @param ensemble ZooKeeper servers quorum string 105 * @param identifier value used to identify this client instance. 106 * @param zkClientConfig client specific configurations for this instance 107 * @return connection to zookeeper 108 * @throws IOException if unable to connect to zk or config problem 109 */ 110 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, 111 final String identifier, ZKClientConfig zkClientConfig) throws IOException { 112 if (ensemble == null) { 113 throw new IOException("Unable to determine ZooKeeper ensemble"); 114 } 115 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 116 if (LOG.isTraceEnabled()) { 117 LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); 118 } 119 int retry = conf.getInt("zookeeper.recovery.retry", 3); 120 int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); 121 int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); 122 int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); 123 return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, 124 maxSleepTime, identifier, multiMaxSize, zkClientConfig); 125 } 126 127 RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries, 128 int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize, 129 ZKClientConfig zkClientConfig) throws IOException { 130 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 131 this.retryCounterFactory = 132 new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); 133 134 if (identifier == null || identifier.length() == 0) { 135 // the identifier = processID@hostName 136 identifier = ManagementFactory.getRuntimeMXBean().getName(); 137 } 138 LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, 139 quorumServers); 140 this.identifier = identifier; 141 this.id = Bytes.toBytes(identifier); 142 143 this.watcher = watcher; 144 this.sessionTimeout = sessionTimeout; 145 this.quorumServers = quorumServers; 146 this.maxMultiSize = maxMultiSize; 147 this.zkClientConfig = zkClientConfig; 148 } 149 150 /** 151 * Returns the maximum size (in bytes) that should be included in any single multi() call. NB: 152 * This is an approximation, so there may be variance in the msg actually sent over the wire. 153 * Please be sure to set this approximately, with respect to your ZK server configuration for 154 * jute.maxbuffer. 155 */ 156 public int getMaxMultiSizeLimit() { 157 return maxMultiSize; 158 } 159 160 /** 161 * Try to create a ZooKeeper connection. Turns any exception encountered into a 162 * KeeperException.OperationTimeoutException so it can retried. 163 * @return The created ZooKeeper connection object 164 * @throws KeeperException if a ZooKeeper operation fails 165 */ 166 private synchronized ZooKeeper checkZk() throws KeeperException { 167 if (this.zk == null) { 168 try { 169 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig); 170 } catch (IOException ex) { 171 LOG.warn("Unable to create ZooKeeper Connection", ex); 172 throw new KeeperException.OperationTimeoutException(); 173 } 174 } 175 return zk; 176 } 177 178 public synchronized void reconnectAfterExpiration() 179 throws IOException, KeeperException, InterruptedException { 180 if (zk != null) { 181 LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x" 182 + Long.toHexString(zk.getSessionId())); 183 zk.close(); 184 // reset the ZooKeeper connection 185 zk = null; 186 } 187 checkZk(); 188 LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId())); 189 } 190 191 /** 192 * delete is an idempotent operation. Retry before throwing exception. This function will not 193 * throw NoNodeException if the path does not exist. 194 */ 195 public void delete(String path, int version) throws InterruptedException, KeeperException { 196 final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete"); 197 try (Scope ignored = span.makeCurrent()) { 198 RetryCounter retryCounter = retryCounterFactory.create(); 199 boolean isRetry = false; // False for first attempt, true for all retries. 200 while (true) { 201 try { 202 long startTime = EnvironmentEdgeManager.currentTime(); 203 checkZk().delete(path, version); 204 span.setStatus(StatusCode.OK); 205 return; 206 } catch (KeeperException e) { 207 switch (e.code()) { 208 case NONODE: 209 if (isRetry) { 210 LOG.debug( 211 "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded."); 212 span.setStatus(StatusCode.OK); 213 return; 214 } 215 LOG.debug("Node {} already deleted, retry={}", path, isRetry); 216 TraceUtil.setError(span, e); 217 throw e; 218 219 case CONNECTIONLOSS: 220 case OPERATIONTIMEOUT: 221 case REQUESTTIMEOUT: 222 TraceUtil.setError(span, e); 223 retryOrThrow(retryCounter, e, "delete"); 224 break; 225 226 default: 227 TraceUtil.setError(span, e); 228 throw e; 229 } 230 } 231 retryCounter.sleepUntilNextRetry(); 232 isRetry = true; 233 } 234 } finally { 235 span.end(); 236 } 237 } 238 239 /** 240 * exists is an idempotent operation. Retry before throwing exception 241 * @return A Stat instance 242 */ 243 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 244 final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 245 try (Scope ignored = span.makeCurrent()) { 246 RetryCounter retryCounter = retryCounterFactory.create(); 247 while (true) { 248 try { 249 long startTime = EnvironmentEdgeManager.currentTime(); 250 Stat nodeStat = checkZk().exists(path, watcher); 251 span.setStatus(StatusCode.OK); 252 return nodeStat; 253 } catch (KeeperException e) { 254 switch (e.code()) { 255 case CONNECTIONLOSS: 256 case OPERATIONTIMEOUT: 257 case REQUESTTIMEOUT: 258 TraceUtil.setError(span, e); 259 retryOrThrow(retryCounter, e, "exists"); 260 break; 261 262 default: 263 TraceUtil.setError(span, e); 264 throw e; 265 } 266 } 267 retryCounter.sleepUntilNextRetry(); 268 } 269 } finally { 270 span.end(); 271 } 272 } 273 274 /** 275 * exists is an idempotent operation. Retry before throwing exception 276 * @return A Stat instance 277 */ 278 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 279 Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 280 try (Scope ignored = span.makeCurrent()) { 281 RetryCounter retryCounter = retryCounterFactory.create(); 282 while (true) { 283 try { 284 long startTime = EnvironmentEdgeManager.currentTime(); 285 Stat nodeStat = checkZk().exists(path, watch); 286 span.setStatus(StatusCode.OK); 287 return nodeStat; 288 } catch (KeeperException e) { 289 switch (e.code()) { 290 case CONNECTIONLOSS: 291 TraceUtil.setError(span, e); 292 retryOrThrow(retryCounter, e, "exists"); 293 break; 294 case OPERATIONTIMEOUT: 295 TraceUtil.setError(span, e); 296 retryOrThrow(retryCounter, e, "exists"); 297 break; 298 299 default: 300 TraceUtil.setError(span, e); 301 throw e; 302 } 303 } 304 retryCounter.sleepUntilNextRetry(); 305 } 306 } finally { 307 span.end(); 308 } 309 } 310 311 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName) 312 throws KeeperException { 313 if (!retryCounter.shouldRetry()) { 314 LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts()); 315 throw e; 316 } 317 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e); 318 } 319 320 /** 321 * getChildren is an idempotent operation. Retry before throwing exception 322 * @return List of children znodes 323 */ 324 public List<String> getChildren(String path, Watcher watcher) 325 throws KeeperException, InterruptedException { 326 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 327 try (Scope ignored = span.makeCurrent()) { 328 RetryCounter retryCounter = retryCounterFactory.create(); 329 while (true) { 330 try { 331 long startTime = EnvironmentEdgeManager.currentTime(); 332 List<String> children = checkZk().getChildren(path, watcher); 333 span.setStatus(StatusCode.OK); 334 return children; 335 } catch (KeeperException e) { 336 switch (e.code()) { 337 case CONNECTIONLOSS: 338 case OPERATIONTIMEOUT: 339 case REQUESTTIMEOUT: 340 TraceUtil.setError(span, e); 341 retryOrThrow(retryCounter, e, "getChildren"); 342 break; 343 344 default: 345 TraceUtil.setError(span, e); 346 throw e; 347 } 348 } 349 retryCounter.sleepUntilNextRetry(); 350 } 351 } finally { 352 span.end(); 353 } 354 } 355 356 /** 357 * getChildren is an idempotent operation. Retry before throwing exception 358 * @return List of children znodes 359 */ 360 public List<String> getChildren(String path, boolean watch) 361 throws KeeperException, InterruptedException { 362 Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 363 try (Scope ignored = span.makeCurrent()) { 364 RetryCounter retryCounter = retryCounterFactory.create(); 365 while (true) { 366 try { 367 long startTime = EnvironmentEdgeManager.currentTime(); 368 List<String> children = checkZk().getChildren(path, watch); 369 span.setStatus(StatusCode.OK); 370 return children; 371 } catch (KeeperException e) { 372 switch (e.code()) { 373 case CONNECTIONLOSS: 374 TraceUtil.setError(span, e); 375 retryOrThrow(retryCounter, e, "getChildren"); 376 break; 377 case OPERATIONTIMEOUT: 378 TraceUtil.setError(span, e); 379 retryOrThrow(retryCounter, e, "getChildren"); 380 break; 381 382 default: 383 TraceUtil.setError(span, e); 384 throw e; 385 } 386 } 387 retryCounter.sleepUntilNextRetry(); 388 } 389 } finally { 390 span.end(); 391 } 392 } 393 394 /** 395 * getData is an idempotent operation. Retry before throwing exception 396 */ 397 public byte[] getData(String path, Watcher watcher, Stat stat) 398 throws KeeperException, InterruptedException { 399 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData"); 400 try (Scope ignored = span.makeCurrent()) { 401 RetryCounter retryCounter = retryCounterFactory.create(); 402 while (true) { 403 try { 404 long startTime = EnvironmentEdgeManager.currentTime(); 405 byte[] revData = checkZk().getData(path, watcher, stat); 406 span.setStatus(StatusCode.OK); 407 return ZKMetadata.removeMetaData(revData); 408 } catch (KeeperException e) { 409 switch (e.code()) { 410 case CONNECTIONLOSS: 411 case OPERATIONTIMEOUT: 412 case REQUESTTIMEOUT: 413 TraceUtil.setError(span, e); 414 retryOrThrow(retryCounter, e, "getData"); 415 break; 416 417 default: 418 TraceUtil.setError(span, e); 419 throw e; 420 } 421 } 422 retryCounter.sleepUntilNextRetry(); 423 } 424 } finally { 425 span.end(); 426 } 427 } 428 429 /** 430 * getData is an idempotent operation. Retry before throwing exception 431 */ 432 public byte[] getData(String path, boolean watch, Stat stat) 433 throws KeeperException, InterruptedException { 434 Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan(); 435 try (Scope scope = span.makeCurrent()) { 436 RetryCounter retryCounter = retryCounterFactory.create(); 437 while (true) { 438 try { 439 long startTime = EnvironmentEdgeManager.currentTime(); 440 byte[] revData = checkZk().getData(path, watch, stat); 441 span.setStatus(StatusCode.OK); 442 return ZKMetadata.removeMetaData(revData); 443 } catch (KeeperException e) { 444 switch (e.code()) { 445 case CONNECTIONLOSS: 446 TraceUtil.setError(span, e); 447 retryOrThrow(retryCounter, e, "getData"); 448 break; 449 case OPERATIONTIMEOUT: 450 TraceUtil.setError(span, e); 451 retryOrThrow(retryCounter, e, "getData"); 452 break; 453 454 default: 455 TraceUtil.setError(span, e); 456 throw e; 457 } 458 } 459 retryCounter.sleepUntilNextRetry(); 460 } 461 } finally { 462 span.end(); 463 } 464 } 465 466 /** 467 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an 468 * identifier field into the data to check whether badversion is caused by the result of previous 469 * correctly setData 470 * @return Stat instance 471 */ 472 public Stat setData(String path, byte[] data, int version) 473 throws KeeperException, InterruptedException { 474 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData"); 475 try (Scope ignored = span.makeCurrent()) { 476 RetryCounter retryCounter = retryCounterFactory.create(); 477 byte[] newData = ZKMetadata.appendMetaData(id, data); 478 boolean isRetry = false; 479 long startTime; 480 while (true) { 481 try { 482 startTime = EnvironmentEdgeManager.currentTime(); 483 Stat nodeStat = checkZk().setData(path, newData, version); 484 span.setStatus(StatusCode.OK); 485 return nodeStat; 486 } catch (KeeperException e) { 487 switch (e.code()) { 488 case CONNECTIONLOSS: 489 case OPERATIONTIMEOUT: 490 case REQUESTTIMEOUT: 491 TraceUtil.setError(span, e); 492 retryOrThrow(retryCounter, e, "setData"); 493 break; 494 case BADVERSION: 495 if (isRetry) { 496 // try to verify whether the previous setData success or not 497 try { 498 Stat stat = new Stat(); 499 byte[] revData = checkZk().getData(path, false, stat); 500 if (Bytes.compareTo(revData, newData) == 0) { 501 // the bad version is caused by previous successful setData 502 span.setStatus(StatusCode.OK); 503 return stat; 504 } 505 } catch (KeeperException keeperException) { 506 // the ZK is not reliable at this moment. just throwing exception 507 TraceUtil.setError(span, keeperException); 508 throw keeperException; 509 } 510 } 511 // throw other exceptions and verified bad version exceptions 512 default: 513 TraceUtil.setError(span, e); 514 throw e; 515 } 516 } 517 retryCounter.sleepUntilNextRetry(); 518 isRetry = true; 519 } 520 } finally { 521 span.end(); 522 } 523 } 524 525 /** 526 * getAcl is an idempotent operation. Retry before throwing exception 527 * @return list of ACLs 528 */ 529 public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException { 530 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl"); 531 try (Scope ignored = span.makeCurrent()) { 532 RetryCounter retryCounter = retryCounterFactory.create(); 533 while (true) { 534 try { 535 long startTime = EnvironmentEdgeManager.currentTime(); 536 List<ACL> nodeACL = checkZk().getACL(path, stat); 537 span.setStatus(StatusCode.OK); 538 return nodeACL; 539 } catch (KeeperException e) { 540 switch (e.code()) { 541 case CONNECTIONLOSS: 542 case OPERATIONTIMEOUT: 543 case REQUESTTIMEOUT: 544 TraceUtil.setError(span, e); 545 retryOrThrow(retryCounter, e, "getAcl"); 546 break; 547 548 default: 549 TraceUtil.setError(span, e); 550 throw e; 551 } 552 } 553 retryCounter.sleepUntilNextRetry(); 554 } 555 } finally { 556 span.end(); 557 } 558 } 559 560 /** 561 * setAcl is an idempotent operation. Retry before throwing exception 562 * @return list of ACLs 563 */ 564 public Stat setAcl(String path, List<ACL> acls, int version) 565 throws KeeperException, InterruptedException { 566 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl"); 567 try (Scope ignored = span.makeCurrent()) { 568 RetryCounter retryCounter = retryCounterFactory.create(); 569 while (true) { 570 try { 571 long startTime = EnvironmentEdgeManager.currentTime(); 572 Stat nodeStat = checkZk().setACL(path, acls, version); 573 span.setStatus(StatusCode.OK); 574 return nodeStat; 575 } catch (KeeperException e) { 576 switch (e.code()) { 577 case CONNECTIONLOSS: 578 case OPERATIONTIMEOUT: 579 TraceUtil.setError(span, e); 580 retryOrThrow(retryCounter, e, "setAcl"); 581 break; 582 583 default: 584 TraceUtil.setError(span, e); 585 throw e; 586 } 587 } 588 retryCounter.sleepUntilNextRetry(); 589 } 590 } finally { 591 span.end(); 592 } 593 } 594 595 /** 596 * <p> 597 * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this 598 * function will not throw the NodeExist exception back to the application. 599 * </p> 600 * <p> 601 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to 602 * verify, whether the previous one is successful or not. 603 * </p> 604 */ 605 public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 606 throws KeeperException, InterruptedException { 607 final Span span = TraceUtil.createSpan("RecoverableZookeeper.create"); 608 try (Scope ignored = span.makeCurrent()) { 609 byte[] newData = ZKMetadata.appendMetaData(id, data); 610 switch (createMode) { 611 case EPHEMERAL: 612 case PERSISTENT: 613 span.setStatus(StatusCode.OK); 614 return createNonSequential(path, newData, acl, createMode); 615 616 case EPHEMERAL_SEQUENTIAL: 617 case PERSISTENT_SEQUENTIAL: 618 span.setStatus(StatusCode.OK); 619 return createSequential(path, newData, acl, createMode); 620 621 default: 622 final IllegalArgumentException e = 623 new IllegalArgumentException("Unrecognized CreateMode: " + createMode); 624 TraceUtil.setError(span, e); 625 throw e; 626 } 627 } finally { 628 span.end(); 629 } 630 } 631 632 private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 633 throws KeeperException, InterruptedException { 634 RetryCounter retryCounter = retryCounterFactory.create(); 635 boolean isRetry = false; // False for first attempt, true for all retries. 636 long startTime; 637 while (true) { 638 try { 639 startTime = EnvironmentEdgeManager.currentTime(); 640 String nodePath = checkZk().create(path, data, acl, createMode); 641 return nodePath; 642 } catch (KeeperException e) { 643 switch (e.code()) { 644 case NODEEXISTS: 645 if (isRetry) { 646 // If the connection was lost, there is still a possibility that 647 // we have successfully created the node at our previous attempt, 648 // so we read the node and compare. 649 byte[] currentData = checkZk().getData(path, false, null); 650 if (currentData != null && Bytes.compareTo(currentData, data) == 0) { 651 // We successfully created a non-sequential node 652 return path; 653 } 654 LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) 655 + ", could not write " + Bytes.toStringBinary(data)); 656 throw e; 657 } 658 LOG.trace("Node {} already exists", path); 659 throw e; 660 661 case CONNECTIONLOSS: 662 case OPERATIONTIMEOUT: 663 case REQUESTTIMEOUT: 664 retryOrThrow(retryCounter, e, "create"); 665 break; 666 667 default: 668 throw e; 669 } 670 } 671 retryCounter.sleepUntilNextRetry(); 672 isRetry = true; 673 } 674 } 675 676 private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 677 throws KeeperException, InterruptedException { 678 RetryCounter retryCounter = retryCounterFactory.create(); 679 boolean first = true; 680 String newPath = path + this.identifier; 681 while (true) { 682 try { 683 if (!first) { 684 // Check if we succeeded on a previous attempt 685 String previousResult = findPreviousSequentialNode(newPath); 686 if (previousResult != null) { 687 return previousResult; 688 } 689 } 690 first = false; 691 long startTime = EnvironmentEdgeManager.currentTime(); 692 String nodePath = checkZk().create(newPath, data, acl, createMode); 693 return nodePath; 694 } catch (KeeperException e) { 695 switch (e.code()) { 696 case CONNECTIONLOSS: 697 case OPERATIONTIMEOUT: 698 case REQUESTTIMEOUT: 699 retryOrThrow(retryCounter, e, "create"); 700 break; 701 702 default: 703 throw e; 704 } 705 } 706 retryCounter.sleepUntilNextRetry(); 707 } 708 } 709 710 /** 711 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to 712 * actually pass to multi (need to do this in order to appendMetaData). 713 */ 714 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 715 if (ops == null) { 716 return null; 717 } 718 719 List<Op> preparedOps = new LinkedList<>(); 720 for (Op op : ops) { 721 if (op.getType() == ZooDefs.OpCode.create) { 722 CreateRequest create = (CreateRequest) op.toRequestRecord(); 723 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 724 create.getAcl(), create.getFlags())); 725 } else if (op.getType() == ZooDefs.OpCode.delete) { 726 // no need to appendMetaData for delete 727 preparedOps.add(op); 728 } else if (op.getType() == ZooDefs.OpCode.setData) { 729 SetDataRequest setData = (SetDataRequest) op.toRequestRecord(); 730 preparedOps.add(Op.setData(setData.getPath(), 731 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 732 } else { 733 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 734 } 735 } 736 return preparedOps; 737 } 738 739 /** 740 * Run multiple operations in a transactional manner. Retry before throwing exception 741 */ 742 public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { 743 final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi"); 744 try (Scope ignored = span.makeCurrent()) { 745 RetryCounter retryCounter = retryCounterFactory.create(); 746 Iterable<Op> multiOps = prepareZKMulti(ops); 747 while (true) { 748 try { 749 long startTime = EnvironmentEdgeManager.currentTime(); 750 List<OpResult> opResults = checkZk().multi(multiOps); 751 span.setStatus(StatusCode.OK); 752 return opResults; 753 } catch (KeeperException e) { 754 switch (e.code()) { 755 case CONNECTIONLOSS: 756 case OPERATIONTIMEOUT: 757 case REQUESTTIMEOUT: 758 TraceUtil.setError(span, e); 759 retryOrThrow(retryCounter, e, "multi"); 760 break; 761 762 default: 763 TraceUtil.setError(span, e); 764 throw e; 765 } 766 } 767 retryCounter.sleepUntilNextRetry(); 768 } 769 } finally { 770 span.end(); 771 } 772 } 773 774 private String findPreviousSequentialNode(String path) 775 throws KeeperException, InterruptedException { 776 int lastSlashIdx = path.lastIndexOf('/'); 777 assert (lastSlashIdx != -1); 778 String parent = path.substring(0, lastSlashIdx); 779 String nodePrefix = path.substring(lastSlashIdx + 1); 780 long startTime = EnvironmentEdgeManager.currentTime(); 781 List<String> nodes = checkZk().getChildren(parent, false); 782 List<String> matching = filterByPrefix(nodes, nodePrefix); 783 for (String node : matching) { 784 String nodePath = parent + "/" + node; 785 startTime = EnvironmentEdgeManager.currentTime(); 786 Stat stat = checkZk().exists(nodePath, false); 787 if (stat != null) { 788 return nodePath; 789 } 790 } 791 return null; 792 } 793 794 public synchronized long getSessionId() { 795 return zk == null ? -1 : zk.getSessionId(); 796 } 797 798 public synchronized void close() throws InterruptedException { 799 if (zk != null) { 800 zk.close(); 801 } 802 } 803 804 public synchronized States getState() { 805 return zk == null ? null : zk.getState(); 806 } 807 808 public synchronized ZooKeeper getZooKeeper() { 809 return zk; 810 } 811 812 public synchronized byte[] getSessionPasswd() { 813 return zk == null ? null : zk.getSessionPasswd(); 814 } 815 816 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 817 checkZk().sync(path, cb, ctx); 818 } 819 820 /** 821 * Filters the given node list by the given prefixes. This method is all-inclusive--if any element 822 * in the node list starts with any of the given prefixes, then it is included in the result. 823 * @param nodes the nodes to filter 824 * @param prefixes the prefixes to include in the result 825 * @return list of every element that starts with one of the prefixes 826 */ 827 private static List<String> filterByPrefix(List<String> nodes, String... prefixes) { 828 List<String> lockChildren = new ArrayList<>(); 829 for (String child : nodes) { 830 for (String prefix : prefixes) { 831 if (child.startsWith(prefix)) { 832 lockChildren.add(child); 833 break; 834 } 835 } 836 } 837 return lockChildren; 838 } 839 840 public String getIdentifier() { 841 return identifier; 842 } 843}