001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.lang.management.ManagementFactory; 025import java.util.ArrayList; 026import java.util.LinkedList; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.trace.TraceUtil; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.RetryCounter; 033import org.apache.hadoop.hbase.util.RetryCounterFactory; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.zookeeper.AsyncCallback; 036import org.apache.zookeeper.CreateMode; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.Op; 039import org.apache.zookeeper.OpResult; 040import org.apache.zookeeper.Watcher; 041import org.apache.zookeeper.ZooDefs; 042import org.apache.zookeeper.ZooKeeper; 043import org.apache.zookeeper.ZooKeeper.States; 044import org.apache.zookeeper.client.ZKClientConfig; 045import org.apache.zookeeper.data.ACL; 046import org.apache.zookeeper.data.Stat; 047import org.apache.zookeeper.proto.CreateRequest; 048import org.apache.zookeeper.proto.SetDataRequest; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A zookeeper that can handle 'recoverable' errors. 054 * <p> 055 * To handle recoverable errors, developers need to realize that there are two classes of requests: 056 * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are 057 * examples of idempotent requests, they can be reissued with the same results. 058 * <p> 059 * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is 060 * the same.) Non-idempotent requests need special handling, application and library writers need to 061 * keep in mind that they may need to encode information in the data or name of znodes to detect 062 * retries. A simple example is a create that uses a sequence flag. If a process issues a 063 * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue 064 * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 065 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result 066 * of the previous create, so the process actually owns both x-109 and x-111. An easy way around 067 * this is to use "x-process id-" when doing the create. If the process is using an id of 352, 068 * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 069 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it 070 * created is "x-352-109". 071 * @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling" 072 */ 073@InterfaceAudience.Private 074public class RecoverableZooKeeper { 075 private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); 076 // the actual ZooKeeper client instance 077 private ZooKeeper zk; 078 private final RetryCounterFactory retryCounterFactory; 079 // An identifier of this process in the cluster 080 private final String identifier; 081 private final byte[] id; 082 private final Watcher watcher; 083 private final int sessionTimeout; 084 private final String quorumServers; 085 private final int maxMultiSize; 086 private final ZKClientConfig zkClientConfig; 087 088 /** 089 * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}. 090 */ 091 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) 092 throws IOException { 093 String ensemble = ZKConfig.getZKQuorumServersString(conf); 094 return connect(conf, ensemble, watcher, null, null); 095 } 096 097 /** 098 * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified 099 * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring 100 * watcher to the specified watcher. 101 * @param conf configuration to pull ensemble and other settings from 102 * @param watcher watcher to monitor connection changes 103 * @param ensemble ZooKeeper servers quorum string 104 * @param identifier value used to identify this client instance. 105 * @param zkClientConfig client specific configurations for this instance 106 * @return connection to zookeeper 107 * @throws IOException if unable to connect to zk or config problem 108 */ 109 public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, 110 final String identifier, ZKClientConfig zkClientConfig) throws IOException { 111 if (ensemble == null) { 112 throw new IOException("Unable to determine ZooKeeper ensemble"); 113 } 114 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 115 if (LOG.isTraceEnabled()) { 116 LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); 117 } 118 int retry = conf.getInt("zookeeper.recovery.retry", 3); 119 int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); 120 int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); 121 int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); 122 return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, 123 maxSleepTime, identifier, multiMaxSize, zkClientConfig); 124 } 125 126 RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries, 127 int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize, 128 ZKClientConfig zkClientConfig) throws IOException { 129 // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. 130 this.retryCounterFactory = 131 new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); 132 133 if (identifier == null || identifier.length() == 0) { 134 // the identifier = processID@hostName 135 identifier = ManagementFactory.getRuntimeMXBean().getName(); 136 } 137 LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, 138 quorumServers); 139 this.identifier = identifier; 140 this.id = Bytes.toBytes(identifier); 141 142 this.watcher = watcher; 143 this.sessionTimeout = sessionTimeout; 144 this.quorumServers = quorumServers; 145 this.maxMultiSize = maxMultiSize; 146 this.zkClientConfig = zkClientConfig; 147 } 148 149 /** 150 * Returns the maximum size (in bytes) that should be included in any single multi() call. NB: 151 * This is an approximation, so there may be variance in the msg actually sent over the wire. 152 * Please be sure to set this approximately, with respect to your ZK server configuration for 153 * jute.maxbuffer. 154 */ 155 public int getMaxMultiSizeLimit() { 156 return maxMultiSize; 157 } 158 159 /** 160 * Try to create a ZooKeeper connection. Turns any exception encountered into a 161 * KeeperException.OperationTimeoutException so it can retried. 162 * @return The created ZooKeeper connection object 163 * @throws KeeperException if a ZooKeeper operation fails 164 */ 165 private synchronized ZooKeeper checkZk() throws KeeperException { 166 if (this.zk == null) { 167 try { 168 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig); 169 } catch (IOException ex) { 170 LOG.warn("Unable to create ZooKeeper Connection", ex); 171 throw new KeeperException.OperationTimeoutException(); 172 } 173 } 174 return zk; 175 } 176 177 public synchronized void reconnectAfterExpiration() 178 throws IOException, KeeperException, InterruptedException { 179 if (zk != null) { 180 LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x" 181 + Long.toHexString(zk.getSessionId())); 182 zk.close(); 183 // reset the ZooKeeper connection 184 zk = null; 185 } 186 checkZk(); 187 LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId())); 188 } 189 190 /** 191 * delete is an idempotent operation. Retry before throwing exception. This function will not 192 * throw NoNodeException if the path does not exist. 193 */ 194 public void delete(String path, int version) throws InterruptedException, KeeperException { 195 final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete"); 196 try (Scope ignored = span.makeCurrent()) { 197 RetryCounter retryCounter = retryCounterFactory.create(); 198 boolean isRetry = false; // False for first attempt, true for all retries. 199 while (true) { 200 try { 201 checkZk().delete(path, version); 202 span.setStatus(StatusCode.OK); 203 return; 204 } catch (KeeperException e) { 205 switch (e.code()) { 206 case NONODE: 207 if (isRetry) { 208 LOG.debug( 209 "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded."); 210 return; 211 } 212 LOG.debug("Node {} already deleted, retry={}", path, isRetry); 213 TraceUtil.setError(span, e); 214 throw e; 215 216 case CONNECTIONLOSS: 217 case OPERATIONTIMEOUT: 218 case REQUESTTIMEOUT: 219 TraceUtil.setError(span, e); 220 retryOrThrow(retryCounter, e, "delete"); 221 break; 222 223 default: 224 TraceUtil.setError(span, e); 225 throw e; 226 } 227 } 228 retryCounter.sleepUntilNextRetry(); 229 isRetry = true; 230 } 231 } finally { 232 span.end(); 233 } 234 } 235 236 /** 237 * exists is an idempotent operation. Retry before throwing exception 238 * @return A Stat instance 239 */ 240 public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { 241 return exists(path, watcher, null); 242 } 243 244 private Stat exists(String path, Watcher watcher, Boolean watch) 245 throws InterruptedException, KeeperException { 246 final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists"); 247 try (Scope ignored = span.makeCurrent()) { 248 RetryCounter retryCounter = retryCounterFactory.create(); 249 while (true) { 250 try { 251 Stat nodeStat; 252 if (watch == null) { 253 nodeStat = checkZk().exists(path, watcher); 254 } else { 255 nodeStat = checkZk().exists(path, watch); 256 } 257 span.setStatus(StatusCode.OK); 258 return nodeStat; 259 } catch (KeeperException e) { 260 switch (e.code()) { 261 case CONNECTIONLOSS: 262 case OPERATIONTIMEOUT: 263 case REQUESTTIMEOUT: 264 TraceUtil.setError(span, e); 265 retryOrThrow(retryCounter, e, "exists"); 266 break; 267 268 default: 269 TraceUtil.setError(span, e); 270 throw e; 271 } 272 } 273 retryCounter.sleepUntilNextRetry(); 274 } 275 } finally { 276 span.end(); 277 } 278 } 279 280 /** 281 * exists is an idempotent operation. Retry before throwing exception 282 * @return A Stat instance 283 */ 284 public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { 285 return exists(path, null, watch); 286 } 287 288 private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName) 289 throws KeeperException { 290 if (!retryCounter.shouldRetry()) { 291 LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts()); 292 throw e; 293 } 294 LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e); 295 } 296 297 /** 298 * getChildren is an idempotent operation. Retry before throwing exception 299 * @return List of children znodes 300 */ 301 public List<String> getChildren(String path, Watcher watcher) 302 throws KeeperException, InterruptedException { 303 return getChildren(path, watcher, null); 304 } 305 306 private List<String> getChildren(String path, Watcher watcher, Boolean watch) 307 throws InterruptedException, KeeperException { 308 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren"); 309 try (Scope ignored = span.makeCurrent()) { 310 RetryCounter retryCounter = retryCounterFactory.create(); 311 while (true) { 312 try { 313 List<String> children; 314 if (watch == null) { 315 children = checkZk().getChildren(path, watcher); 316 } else { 317 children = checkZk().getChildren(path, watch); 318 } 319 span.setStatus(StatusCode.OK); 320 return children; 321 } catch (KeeperException e) { 322 switch (e.code()) { 323 case CONNECTIONLOSS: 324 case OPERATIONTIMEOUT: 325 case REQUESTTIMEOUT: 326 TraceUtil.setError(span, e); 327 retryOrThrow(retryCounter, e, "getChildren"); 328 break; 329 330 default: 331 TraceUtil.setError(span, e); 332 throw e; 333 } 334 } 335 retryCounter.sleepUntilNextRetry(); 336 } 337 } finally { 338 span.end(); 339 } 340 } 341 342 /** 343 * getChildren is an idempotent operation. Retry before throwing exception 344 * @return List of children znodes 345 */ 346 public List<String> getChildren(String path, boolean watch) 347 throws KeeperException, InterruptedException { 348 return getChildren(path, null, watch); 349 } 350 351 /** 352 * getData is an idempotent operation. Retry before throwing exception 353 */ 354 public byte[] getData(String path, Watcher watcher, Stat stat) 355 throws KeeperException, InterruptedException { 356 return getData(path, watcher, null, stat); 357 } 358 359 private byte[] getData(String path, Watcher watcher, Boolean watch, Stat stat) 360 throws InterruptedException, KeeperException { 361 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData"); 362 try (Scope ignored = span.makeCurrent()) { 363 RetryCounter retryCounter = retryCounterFactory.create(); 364 while (true) { 365 try { 366 byte[] revData; 367 if (watch == null) { 368 revData = checkZk().getData(path, watcher, stat); 369 } else { 370 revData = checkZk().getData(path, watch, stat); 371 } 372 span.setStatus(StatusCode.OK); 373 return ZKMetadata.removeMetaData(revData); 374 } catch (KeeperException e) { 375 switch (e.code()) { 376 case CONNECTIONLOSS: 377 case OPERATIONTIMEOUT: 378 case REQUESTTIMEOUT: 379 TraceUtil.setError(span, e); 380 retryOrThrow(retryCounter, e, "getData"); 381 break; 382 383 default: 384 TraceUtil.setError(span, e); 385 throw e; 386 } 387 } 388 retryCounter.sleepUntilNextRetry(); 389 } 390 } finally { 391 span.end(); 392 } 393 } 394 395 /** 396 * getData is an idempotent operation. Retry before throwing exception 397 */ 398 public byte[] getData(String path, boolean watch, Stat stat) 399 throws KeeperException, InterruptedException { 400 return getData(path, null, watch, stat); 401 } 402 403 /** 404 * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an 405 * identifier field into the data to check whether badversion is caused by the result of previous 406 * correctly setData 407 * @return Stat instance 408 */ 409 public Stat setData(String path, byte[] data, int version) 410 throws KeeperException, InterruptedException { 411 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData"); 412 try (Scope ignored = span.makeCurrent()) { 413 RetryCounter retryCounter = retryCounterFactory.create(); 414 byte[] newData = ZKMetadata.appendMetaData(id, data); 415 boolean isRetry = false; 416 while (true) { 417 try { 418 span.setStatus(StatusCode.OK); 419 return checkZk().setData(path, newData, version); 420 } catch (KeeperException e) { 421 switch (e.code()) { 422 case CONNECTIONLOSS: 423 case OPERATIONTIMEOUT: 424 case REQUESTTIMEOUT: 425 TraceUtil.setError(span, e); 426 retryOrThrow(retryCounter, e, "setData"); 427 break; 428 case BADVERSION: 429 if (isRetry) { 430 // try to verify whether the previous setData success or not 431 try { 432 Stat stat = new Stat(); 433 byte[] revData = checkZk().getData(path, false, stat); 434 if (Bytes.compareTo(revData, newData) == 0) { 435 // the bad version is caused by previous successful setData 436 return stat; 437 } 438 } catch (KeeperException keeperException) { 439 // the ZK is not reliable at this moment. just throwing exception 440 TraceUtil.setError(span, e); 441 throw keeperException; 442 } 443 } 444 // throw other exceptions and verified bad version exceptions 445 default: 446 TraceUtil.setError(span, e); 447 throw e; 448 } 449 } 450 retryCounter.sleepUntilNextRetry(); 451 isRetry = true; 452 } 453 } finally { 454 span.end(); 455 } 456 } 457 458 /** 459 * getAcl is an idempotent operation. Retry before throwing exception 460 * @return list of ACLs 461 */ 462 public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException { 463 final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl"); 464 try (Scope ignored = span.makeCurrent()) { 465 RetryCounter retryCounter = retryCounterFactory.create(); 466 while (true) { 467 try { 468 span.setStatus(StatusCode.OK); 469 return checkZk().getACL(path, stat); 470 } catch (KeeperException e) { 471 switch (e.code()) { 472 case CONNECTIONLOSS: 473 case OPERATIONTIMEOUT: 474 case REQUESTTIMEOUT: 475 TraceUtil.setError(span, e); 476 retryOrThrow(retryCounter, e, "getAcl"); 477 break; 478 479 default: 480 TraceUtil.setError(span, e); 481 throw e; 482 } 483 } 484 retryCounter.sleepUntilNextRetry(); 485 } 486 } finally { 487 span.end(); 488 } 489 } 490 491 /** 492 * setAcl is an idempotent operation. Retry before throwing exception 493 * @return list of ACLs 494 */ 495 public Stat setAcl(String path, List<ACL> acls, int version) 496 throws KeeperException, InterruptedException { 497 final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl"); 498 try (Scope ignored = span.makeCurrent()) { 499 RetryCounter retryCounter = retryCounterFactory.create(); 500 while (true) { 501 try { 502 span.setStatus(StatusCode.OK); 503 return checkZk().setACL(path, acls, version); 504 } catch (KeeperException e) { 505 switch (e.code()) { 506 case CONNECTIONLOSS: 507 case OPERATIONTIMEOUT: 508 TraceUtil.setError(span, e); 509 retryOrThrow(retryCounter, e, "setAcl"); 510 break; 511 512 default: 513 TraceUtil.setError(span, e); 514 throw e; 515 } 516 } 517 retryCounter.sleepUntilNextRetry(); 518 } 519 } finally { 520 span.end(); 521 } 522 } 523 524 /** 525 * <p> 526 * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this 527 * function will not throw the NodeExist exception back to the application. 528 * </p> 529 * <p> 530 * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to 531 * verify, whether the previous one is successful or not. 532 * </p> 533 */ 534 public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 535 throws KeeperException, InterruptedException { 536 final Span span = TraceUtil.createSpan("RecoverableZookeeper.create"); 537 try (Scope ignored = span.makeCurrent()) { 538 byte[] newData = ZKMetadata.appendMetaData(id, data); 539 switch (createMode) { 540 case EPHEMERAL: 541 case PERSISTENT: 542 span.setStatus(StatusCode.OK); 543 return createNonSequential(path, newData, acl, createMode); 544 545 case EPHEMERAL_SEQUENTIAL: 546 case PERSISTENT_SEQUENTIAL: 547 span.setStatus(StatusCode.OK); 548 return createSequential(path, newData, acl, createMode); 549 550 default: 551 final IllegalArgumentException e = 552 new IllegalArgumentException("Unrecognized CreateMode: " + createMode); 553 TraceUtil.setError(span, e); 554 throw e; 555 } 556 } finally { 557 span.end(); 558 } 559 } 560 561 private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 562 throws KeeperException, InterruptedException { 563 RetryCounter retryCounter = retryCounterFactory.create(); 564 boolean isRetry = false; // False for first attempt, true for all retries. 565 while (true) { 566 try { 567 return checkZk().create(path, data, acl, createMode); 568 } catch (KeeperException e) { 569 switch (e.code()) { 570 case NODEEXISTS: 571 if (isRetry) { 572 // If the connection was lost, there is still a possibility that 573 // we have successfully created the node at our previous attempt, 574 // so we read the node and compare. 575 byte[] currentData = checkZk().getData(path, false, null); 576 if (currentData != null && Bytes.compareTo(currentData, data) == 0) { 577 // We successfully created a non-sequential node 578 return path; 579 } 580 LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) 581 + ", could not write " + Bytes.toStringBinary(data)); 582 throw e; 583 } 584 LOG.trace("Node {} already exists", path); 585 throw e; 586 587 case CONNECTIONLOSS: 588 case OPERATIONTIMEOUT: 589 case REQUESTTIMEOUT: 590 retryOrThrow(retryCounter, e, "create"); 591 break; 592 593 default: 594 throw e; 595 } 596 } 597 retryCounter.sleepUntilNextRetry(); 598 isRetry = true; 599 } 600 } 601 602 private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode) 603 throws KeeperException, InterruptedException { 604 RetryCounter retryCounter = retryCounterFactory.create(); 605 boolean first = true; 606 String newPath = path + this.identifier; 607 while (true) { 608 try { 609 if (!first) { 610 // Check if we succeeded on a previous attempt 611 String previousResult = findPreviousSequentialNode(newPath); 612 if (previousResult != null) { 613 return previousResult; 614 } 615 } 616 first = false; 617 return checkZk().create(newPath, data, acl, createMode); 618 } catch (KeeperException e) { 619 switch (e.code()) { 620 case CONNECTIONLOSS: 621 case OPERATIONTIMEOUT: 622 case REQUESTTIMEOUT: 623 retryOrThrow(retryCounter, e, "create"); 624 break; 625 626 default: 627 throw e; 628 } 629 } 630 retryCounter.sleepUntilNextRetry(); 631 } 632 } 633 634 /** 635 * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to 636 * actually pass to multi (need to do this in order to appendMetaData). 637 */ 638 private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException { 639 if (ops == null) { 640 return null; 641 } 642 643 List<Op> preparedOps = new LinkedList<>(); 644 for (Op op : ops) { 645 if (op.getType() == ZooDefs.OpCode.create) { 646 CreateRequest create = (CreateRequest) op.toRequestRecord(); 647 preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), 648 create.getAcl(), create.getFlags())); 649 } else if (op.getType() == ZooDefs.OpCode.delete) { 650 // no need to appendMetaData for delete 651 preparedOps.add(op); 652 } else if (op.getType() == ZooDefs.OpCode.setData) { 653 SetDataRequest setData = (SetDataRequest) op.toRequestRecord(); 654 preparedOps.add(Op.setData(setData.getPath(), 655 ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion())); 656 } else { 657 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); 658 } 659 } 660 return preparedOps; 661 } 662 663 /** 664 * Run multiple operations in a transactional manner. Retry before throwing exception 665 */ 666 public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException { 667 final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi"); 668 try (Scope ignored = span.makeCurrent()) { 669 RetryCounter retryCounter = retryCounterFactory.create(); 670 Iterable<Op> multiOps = prepareZKMulti(ops); 671 while (true) { 672 try { 673 span.setStatus(StatusCode.OK); 674 return checkZk().multi(multiOps); 675 } catch (KeeperException e) { 676 switch (e.code()) { 677 case CONNECTIONLOSS: 678 case OPERATIONTIMEOUT: 679 case REQUESTTIMEOUT: 680 TraceUtil.setError(span, e); 681 retryOrThrow(retryCounter, e, "multi"); 682 break; 683 684 default: 685 TraceUtil.setError(span, e); 686 throw e; 687 } 688 } 689 retryCounter.sleepUntilNextRetry(); 690 } 691 } finally { 692 span.end(); 693 } 694 } 695 696 private String findPreviousSequentialNode(String path) 697 throws KeeperException, InterruptedException { 698 int lastSlashIdx = path.lastIndexOf('/'); 699 assert (lastSlashIdx != -1); 700 String parent = path.substring(0, lastSlashIdx); 701 String nodePrefix = path.substring(lastSlashIdx + 1); 702 List<String> nodes = checkZk().getChildren(parent, false); 703 List<String> matching = filterByPrefix(nodes, nodePrefix); 704 for (String node : matching) { 705 String nodePath = parent + "/" + node; 706 Stat stat = checkZk().exists(nodePath, false); 707 if (stat != null) { 708 return nodePath; 709 } 710 } 711 return null; 712 } 713 714 public synchronized long getSessionId() { 715 return zk == null ? -1 : zk.getSessionId(); 716 } 717 718 public synchronized void close() throws InterruptedException { 719 if (zk != null) { 720 zk.close(); 721 } 722 } 723 724 public synchronized States getState() { 725 return zk == null ? null : zk.getState(); 726 } 727 728 public synchronized ZooKeeper getZooKeeper() { 729 return zk; 730 } 731 732 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { 733 checkZk().sync(path, cb, ctx); 734 } 735 736 /** 737 * Filters the given node list by the given prefixes. This method is all-inclusive--if any element 738 * in the node list starts with any of the given prefixes, then it is included in the result. 739 * @param nodes the nodes to filter 740 * @param prefixes the prefixes to include in the result 741 * @return list of every element that starts with one of the prefixes 742 */ 743 private static List<String> filterByPrefix(List<String> nodes, String... prefixes) { 744 List<String> lockChildren = new ArrayList<>(); 745 for (String child : nodes) { 746 for (String prefix : prefixes) { 747 if (child.startsWith(prefix)) { 748 lockChildren.add(child); 749 break; 750 } 751 } 752 } 753 return lockChildren; 754 } 755 756 public String getIdentifier() { 757 return identifier; 758 } 759}