001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.util.ArrayList;
026import java.util.LinkedList;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.trace.TraceUtil;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.util.RetryCounter;
034import org.apache.hadoop.hbase.util.RetryCounterFactory;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.zookeeper.AsyncCallback;
037import org.apache.zookeeper.CreateMode;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.Op;
040import org.apache.zookeeper.OpResult;
041import org.apache.zookeeper.Watcher;
042import org.apache.zookeeper.ZooDefs;
043import org.apache.zookeeper.ZooKeeper;
044import org.apache.zookeeper.ZooKeeper.States;
045import org.apache.zookeeper.client.ZKClientConfig;
046import org.apache.zookeeper.data.ACL;
047import org.apache.zookeeper.data.Stat;
048import org.apache.zookeeper.proto.CreateRequest;
049import org.apache.zookeeper.proto.SetDataRequest;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * A zookeeper that can handle 'recoverable' errors.
055 * <p>
056 * To handle recoverable errors, developers need to realize that there are two classes of requests:
057 * idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are
058 * examples of idempotent requests, they can be reissued with the same results.
059 * <p>
060 * (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is
061 * the same.) Non-idempotent requests need special handling, application and library writers need to
062 * keep in mind that they may need to encode information in the data or name of znodes to detect
063 * retries. A simple example is a create that uses a sequence flag. If a process issues a
064 * create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue
065 * another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
066 * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result
067 * of the previous create, so the process actually owns both x-109 and x-111. An easy way around
068 * this is to use "x-process id-" when doing the create. If the process is using an id of 352,
069 * before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
070 * "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
071 * created is "x-352-109".
072 * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
073 */
074@InterfaceAudience.Private
075public class RecoverableZooKeeper {
076  private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
077  // the actual ZooKeeper client instance
078  private ZooKeeper zk;
079  private final RetryCounterFactory retryCounterFactory;
080  // An identifier of this process in the cluster
081  private final String identifier;
082  private final byte[] id;
083  private final Watcher watcher;
084  private final int sessionTimeout;
085  private final String quorumServers;
086  private final int maxMultiSize;
087  private final ZKClientConfig zkClientConfig;
088
089  /**
090   * See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.
091   */
092  public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
093    throws IOException {
094    String ensemble = ZKConfig.getZKQuorumServersString(conf);
095    return connect(conf, ensemble, watcher, null, null);
096  }
097
098  /**
099   * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
100   * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
101   * watcher to the specified watcher.
102   * @param conf           configuration to pull ensemble and other settings from
103   * @param watcher        watcher to monitor connection changes
104   * @param ensemble       ZooKeeper servers quorum string
105   * @param identifier     value used to identify this client instance.
106   * @param zkClientConfig client specific configurations for this instance
107   * @return connection to zookeeper
108   * @throws IOException if unable to connect to zk or config problem
109   */
110  public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
111    final String identifier, ZKClientConfig zkClientConfig) throws IOException {
112    if (ensemble == null) {
113      throw new IOException("Unable to determine ZooKeeper ensemble");
114    }
115    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
116    if (LOG.isTraceEnabled()) {
117      LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
118    }
119    int retry = conf.getInt("zookeeper.recovery.retry", 3);
120    int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
121    int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
122    int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
123    return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
124      maxSleepTime, identifier, multiMaxSize, zkClientConfig);
125  }
126
127  RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries,
128    int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize,
129    ZKClientConfig zkClientConfig) throws IOException {
130    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
131    this.retryCounterFactory =
132      new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
133
134    if (identifier == null || identifier.length() == 0) {
135      // the identifier = processID@hostName
136      identifier = ManagementFactory.getRuntimeMXBean().getName();
137    }
138    LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
139      quorumServers);
140    this.identifier = identifier;
141    this.id = Bytes.toBytes(identifier);
142
143    this.watcher = watcher;
144    this.sessionTimeout = sessionTimeout;
145    this.quorumServers = quorumServers;
146    this.maxMultiSize = maxMultiSize;
147    this.zkClientConfig = zkClientConfig;
148  }
149
150  /**
151   * Returns the maximum size (in bytes) that should be included in any single multi() call. NB:
152   * This is an approximation, so there may be variance in the msg actually sent over the wire.
153   * Please be sure to set this approximately, with respect to your ZK server configuration for
154   * jute.maxbuffer.
155   */
156  public int getMaxMultiSizeLimit() {
157    return maxMultiSize;
158  }
159
160  /**
161   * Try to create a ZooKeeper connection. Turns any exception encountered into a
162   * KeeperException.OperationTimeoutException so it can retried.
163   * @return The created ZooKeeper connection object
164   * @throws KeeperException if a ZooKeeper operation fails
165   */
166  private synchronized ZooKeeper checkZk() throws KeeperException {
167    if (this.zk == null) {
168      try {
169        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig);
170      } catch (IOException ex) {
171        LOG.warn("Unable to create ZooKeeper Connection", ex);
172        throw new KeeperException.OperationTimeoutException();
173      }
174    }
175    return zk;
176  }
177
178  public synchronized void reconnectAfterExpiration()
179    throws IOException, KeeperException, InterruptedException {
180    if (zk != null) {
181      LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x"
182        + Long.toHexString(zk.getSessionId()));
183      zk.close();
184      // reset the ZooKeeper connection
185      zk = null;
186    }
187    checkZk();
188    LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId()));
189  }
190
191  /**
192   * delete is an idempotent operation. Retry before throwing exception. This function will not
193   * throw NoNodeException if the path does not exist.
194   */
195  public void delete(String path, int version) throws InterruptedException, KeeperException {
196    final Span span = TraceUtil.createSpan("RecoverableZookeeper.delete");
197    try (Scope ignored = span.makeCurrent()) {
198      RetryCounter retryCounter = retryCounterFactory.create();
199      boolean isRetry = false; // False for first attempt, true for all retries.
200      while (true) {
201        try {
202          long startTime = EnvironmentEdgeManager.currentTime();
203          checkZk().delete(path, version);
204          span.setStatus(StatusCode.OK);
205          return;
206        } catch (KeeperException e) {
207          switch (e.code()) {
208            case NONODE:
209              if (isRetry) {
210                LOG.debug(
211                  "Node " + path + " already deleted. Assuming a " + "previous attempt succeeded.");
212                span.setStatus(StatusCode.OK);
213                return;
214              }
215              LOG.debug("Node {} already deleted, retry={}", path, isRetry);
216              TraceUtil.setError(span, e);
217              throw e;
218
219            case CONNECTIONLOSS:
220            case OPERATIONTIMEOUT:
221            case REQUESTTIMEOUT:
222              TraceUtil.setError(span, e);
223              retryOrThrow(retryCounter, e, "delete");
224              break;
225
226            default:
227              TraceUtil.setError(span, e);
228              throw e;
229          }
230        }
231        retryCounter.sleepUntilNextRetry();
232        isRetry = true;
233      }
234    } finally {
235      span.end();
236    }
237  }
238
239  /**
240   * exists is an idempotent operation. Retry before throwing exception
241   * @return A Stat instance
242   */
243  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
244    final Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
245    try (Scope ignored = span.makeCurrent()) {
246      RetryCounter retryCounter = retryCounterFactory.create();
247      while (true) {
248        try {
249          long startTime = EnvironmentEdgeManager.currentTime();
250          Stat nodeStat = checkZk().exists(path, watcher);
251          span.setStatus(StatusCode.OK);
252          return nodeStat;
253        } catch (KeeperException e) {
254          switch (e.code()) {
255            case CONNECTIONLOSS:
256            case OPERATIONTIMEOUT:
257            case REQUESTTIMEOUT:
258              TraceUtil.setError(span, e);
259              retryOrThrow(retryCounter, e, "exists");
260              break;
261
262            default:
263              TraceUtil.setError(span, e);
264              throw e;
265          }
266        }
267        retryCounter.sleepUntilNextRetry();
268      }
269    } finally {
270      span.end();
271    }
272  }
273
274  /**
275   * exists is an idempotent operation. Retry before throwing exception
276   * @return A Stat instance
277   */
278  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
279    Span span = TraceUtil.createSpan("RecoverableZookeeper.exists");
280    try (Scope ignored = span.makeCurrent()) {
281      RetryCounter retryCounter = retryCounterFactory.create();
282      while (true) {
283        try {
284          long startTime = EnvironmentEdgeManager.currentTime();
285          Stat nodeStat = checkZk().exists(path, watch);
286          span.setStatus(StatusCode.OK);
287          return nodeStat;
288        } catch (KeeperException e) {
289          switch (e.code()) {
290            case CONNECTIONLOSS:
291              TraceUtil.setError(span, e);
292              retryOrThrow(retryCounter, e, "exists");
293              break;
294            case OPERATIONTIMEOUT:
295              TraceUtil.setError(span, e);
296              retryOrThrow(retryCounter, e, "exists");
297              break;
298
299            default:
300              TraceUtil.setError(span, e);
301              throw e;
302          }
303        }
304        retryCounter.sleepUntilNextRetry();
305      }
306    } finally {
307      span.end();
308    }
309  }
310
311  private void retryOrThrow(RetryCounter retryCounter, KeeperException e, String opName)
312    throws KeeperException {
313    if (!retryCounter.shouldRetry()) {
314      LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
315      throw e;
316    }
317    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
318  }
319
320  /**
321   * getChildren is an idempotent operation. Retry before throwing exception
322   * @return List of children znodes
323   */
324  public List<String> getChildren(String path, Watcher watcher)
325    throws KeeperException, InterruptedException {
326    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
327    try (Scope ignored = span.makeCurrent()) {
328      RetryCounter retryCounter = retryCounterFactory.create();
329      while (true) {
330        try {
331          long startTime = EnvironmentEdgeManager.currentTime();
332          List<String> children = checkZk().getChildren(path, watcher);
333          span.setStatus(StatusCode.OK);
334          return children;
335        } catch (KeeperException e) {
336          switch (e.code()) {
337            case CONNECTIONLOSS:
338            case OPERATIONTIMEOUT:
339            case REQUESTTIMEOUT:
340              TraceUtil.setError(span, e);
341              retryOrThrow(retryCounter, e, "getChildren");
342              break;
343
344            default:
345              TraceUtil.setError(span, e);
346              throw e;
347          }
348        }
349        retryCounter.sleepUntilNextRetry();
350      }
351    } finally {
352      span.end();
353    }
354  }
355
356  /**
357   * getChildren is an idempotent operation. Retry before throwing exception
358   * @return List of children znodes
359   */
360  public List<String> getChildren(String path, boolean watch)
361    throws KeeperException, InterruptedException {
362    Span span = TraceUtil.createSpan("RecoverableZookeeper.getChildren");
363    try (Scope ignored = span.makeCurrent()) {
364      RetryCounter retryCounter = retryCounterFactory.create();
365      while (true) {
366        try {
367          long startTime = EnvironmentEdgeManager.currentTime();
368          List<String> children = checkZk().getChildren(path, watch);
369          span.setStatus(StatusCode.OK);
370          return children;
371        } catch (KeeperException e) {
372          switch (e.code()) {
373            case CONNECTIONLOSS:
374              TraceUtil.setError(span, e);
375              retryOrThrow(retryCounter, e, "getChildren");
376              break;
377            case OPERATIONTIMEOUT:
378              TraceUtil.setError(span, e);
379              retryOrThrow(retryCounter, e, "getChildren");
380              break;
381
382            default:
383              TraceUtil.setError(span, e);
384              throw e;
385          }
386        }
387        retryCounter.sleepUntilNextRetry();
388      }
389    } finally {
390      span.end();
391    }
392  }
393
394  /**
395   * getData is an idempotent operation. Retry before throwing exception
396   */
397  public byte[] getData(String path, Watcher watcher, Stat stat)
398    throws KeeperException, InterruptedException {
399    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getData");
400    try (Scope ignored = span.makeCurrent()) {
401      RetryCounter retryCounter = retryCounterFactory.create();
402      while (true) {
403        try {
404          long startTime = EnvironmentEdgeManager.currentTime();
405          byte[] revData = checkZk().getData(path, watcher, stat);
406          span.setStatus(StatusCode.OK);
407          return ZKMetadata.removeMetaData(revData);
408        } catch (KeeperException e) {
409          switch (e.code()) {
410            case CONNECTIONLOSS:
411            case OPERATIONTIMEOUT:
412            case REQUESTTIMEOUT:
413              TraceUtil.setError(span, e);
414              retryOrThrow(retryCounter, e, "getData");
415              break;
416
417            default:
418              TraceUtil.setError(span, e);
419              throw e;
420          }
421        }
422        retryCounter.sleepUntilNextRetry();
423      }
424    } finally {
425      span.end();
426    }
427  }
428
429  /**
430   * getData is an idempotent operation. Retry before throwing exception
431   */
432  public byte[] getData(String path, boolean watch, Stat stat)
433    throws KeeperException, InterruptedException {
434    Span span = TraceUtil.getGlobalTracer().spanBuilder("RecoverableZookeeper.getData").startSpan();
435    try (Scope scope = span.makeCurrent()) {
436      RetryCounter retryCounter = retryCounterFactory.create();
437      while (true) {
438        try {
439          long startTime = EnvironmentEdgeManager.currentTime();
440          byte[] revData = checkZk().getData(path, watch, stat);
441          span.setStatus(StatusCode.OK);
442          return ZKMetadata.removeMetaData(revData);
443        } catch (KeeperException e) {
444          switch (e.code()) {
445            case CONNECTIONLOSS:
446              TraceUtil.setError(span, e);
447              retryOrThrow(retryCounter, e, "getData");
448              break;
449            case OPERATIONTIMEOUT:
450              TraceUtil.setError(span, e);
451              retryOrThrow(retryCounter, e, "getData");
452              break;
453
454            default:
455              TraceUtil.setError(span, e);
456              throw e;
457          }
458        }
459        retryCounter.sleepUntilNextRetry();
460      }
461    } finally {
462      span.end();
463    }
464  }
465
466  /**
467   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception Adding an
468   * identifier field into the data to check whether badversion is caused by the result of previous
469   * correctly setData
470   * @return Stat instance
471   */
472  public Stat setData(String path, byte[] data, int version)
473    throws KeeperException, InterruptedException {
474    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setData");
475    try (Scope ignored = span.makeCurrent()) {
476      RetryCounter retryCounter = retryCounterFactory.create();
477      byte[] newData = ZKMetadata.appendMetaData(id, data);
478      boolean isRetry = false;
479      long startTime;
480      while (true) {
481        try {
482          startTime = EnvironmentEdgeManager.currentTime();
483          Stat nodeStat = checkZk().setData(path, newData, version);
484          span.setStatus(StatusCode.OK);
485          return nodeStat;
486        } catch (KeeperException e) {
487          switch (e.code()) {
488            case CONNECTIONLOSS:
489            case OPERATIONTIMEOUT:
490            case REQUESTTIMEOUT:
491              TraceUtil.setError(span, e);
492              retryOrThrow(retryCounter, e, "setData");
493              break;
494            case BADVERSION:
495              if (isRetry) {
496                // try to verify whether the previous setData success or not
497                try {
498                  Stat stat = new Stat();
499                  byte[] revData = checkZk().getData(path, false, stat);
500                  if (Bytes.compareTo(revData, newData) == 0) {
501                    // the bad version is caused by previous successful setData
502                    span.setStatus(StatusCode.OK);
503                    return stat;
504                  }
505                } catch (KeeperException keeperException) {
506                  // the ZK is not reliable at this moment. just throwing exception
507                  TraceUtil.setError(span, keeperException);
508                  throw keeperException;
509                }
510              }
511              // throw other exceptions and verified bad version exceptions
512            default:
513              TraceUtil.setError(span, e);
514              throw e;
515          }
516        }
517        retryCounter.sleepUntilNextRetry();
518        isRetry = true;
519      }
520    } finally {
521      span.end();
522    }
523  }
524
525  /**
526   * getAcl is an idempotent operation. Retry before throwing exception
527   * @return list of ACLs
528   */
529  public List<ACL> getAcl(String path, Stat stat) throws KeeperException, InterruptedException {
530    final Span span = TraceUtil.createSpan("RecoverableZookeeper.getAcl");
531    try (Scope ignored = span.makeCurrent()) {
532      RetryCounter retryCounter = retryCounterFactory.create();
533      while (true) {
534        try {
535          long startTime = EnvironmentEdgeManager.currentTime();
536          List<ACL> nodeACL = checkZk().getACL(path, stat);
537          span.setStatus(StatusCode.OK);
538          return nodeACL;
539        } catch (KeeperException e) {
540          switch (e.code()) {
541            case CONNECTIONLOSS:
542            case OPERATIONTIMEOUT:
543            case REQUESTTIMEOUT:
544              TraceUtil.setError(span, e);
545              retryOrThrow(retryCounter, e, "getAcl");
546              break;
547
548            default:
549              TraceUtil.setError(span, e);
550              throw e;
551          }
552        }
553        retryCounter.sleepUntilNextRetry();
554      }
555    } finally {
556      span.end();
557    }
558  }
559
560  /**
561   * setAcl is an idempotent operation. Retry before throwing exception
562   * @return list of ACLs
563   */
564  public Stat setAcl(String path, List<ACL> acls, int version)
565    throws KeeperException, InterruptedException {
566    final Span span = TraceUtil.createSpan("RecoverableZookeeper.setAcl");
567    try (Scope ignored = span.makeCurrent()) {
568      RetryCounter retryCounter = retryCounterFactory.create();
569      while (true) {
570        try {
571          long startTime = EnvironmentEdgeManager.currentTime();
572          Stat nodeStat = checkZk().setACL(path, acls, version);
573          span.setStatus(StatusCode.OK);
574          return nodeStat;
575        } catch (KeeperException e) {
576          switch (e.code()) {
577            case CONNECTIONLOSS:
578            case OPERATIONTIMEOUT:
579              TraceUtil.setError(span, e);
580              retryOrThrow(retryCounter, e, "setAcl");
581              break;
582
583            default:
584              TraceUtil.setError(span, e);
585              throw e;
586          }
587        }
588        retryCounter.sleepUntilNextRetry();
589      }
590    } finally {
591      span.end();
592    }
593  }
594
595  /**
596   * <p>
597   * NONSEQUENTIAL create is idempotent operation. Retry before throwing exceptions. But this
598   * function will not throw the NodeExist exception back to the application.
599   * </p>
600   * <p>
601   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add identifier to the path to
602   * verify, whether the previous one is successful or not.
603   * </p>
604   */
605  public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
606    throws KeeperException, InterruptedException {
607    final Span span = TraceUtil.createSpan("RecoverableZookeeper.create");
608    try (Scope ignored = span.makeCurrent()) {
609      byte[] newData = ZKMetadata.appendMetaData(id, data);
610      switch (createMode) {
611        case EPHEMERAL:
612        case PERSISTENT:
613          span.setStatus(StatusCode.OK);
614          return createNonSequential(path, newData, acl, createMode);
615
616        case EPHEMERAL_SEQUENTIAL:
617        case PERSISTENT_SEQUENTIAL:
618          span.setStatus(StatusCode.OK);
619          return createSequential(path, newData, acl, createMode);
620
621        default:
622          final IllegalArgumentException e =
623            new IllegalArgumentException("Unrecognized CreateMode: " + createMode);
624          TraceUtil.setError(span, e);
625          throw e;
626      }
627    } finally {
628      span.end();
629    }
630  }
631
632  private String createNonSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
633    throws KeeperException, InterruptedException {
634    RetryCounter retryCounter = retryCounterFactory.create();
635    boolean isRetry = false; // False for first attempt, true for all retries.
636    long startTime;
637    while (true) {
638      try {
639        startTime = EnvironmentEdgeManager.currentTime();
640        String nodePath = checkZk().create(path, data, acl, createMode);
641        return nodePath;
642      } catch (KeeperException e) {
643        switch (e.code()) {
644          case NODEEXISTS:
645            if (isRetry) {
646              // If the connection was lost, there is still a possibility that
647              // we have successfully created the node at our previous attempt,
648              // so we read the node and compare.
649              byte[] currentData = checkZk().getData(path, false, null);
650              if (currentData != null && Bytes.compareTo(currentData, data) == 0) {
651                // We successfully created a non-sequential node
652                return path;
653              }
654              LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData)
655                + ", could not write " + Bytes.toStringBinary(data));
656              throw e;
657            }
658            LOG.trace("Node {} already exists", path);
659            throw e;
660
661          case CONNECTIONLOSS:
662          case OPERATIONTIMEOUT:
663          case REQUESTTIMEOUT:
664            retryOrThrow(retryCounter, e, "create");
665            break;
666
667          default:
668            throw e;
669        }
670      }
671      retryCounter.sleepUntilNextRetry();
672      isRetry = true;
673    }
674  }
675
676  private String createSequential(String path, byte[] data, List<ACL> acl, CreateMode createMode)
677    throws KeeperException, InterruptedException {
678    RetryCounter retryCounter = retryCounterFactory.create();
679    boolean first = true;
680    String newPath = path + this.identifier;
681    while (true) {
682      try {
683        if (!first) {
684          // Check if we succeeded on a previous attempt
685          String previousResult = findPreviousSequentialNode(newPath);
686          if (previousResult != null) {
687            return previousResult;
688          }
689        }
690        first = false;
691        long startTime = EnvironmentEdgeManager.currentTime();
692        String nodePath = checkZk().create(newPath, data, acl, createMode);
693        return nodePath;
694      } catch (KeeperException e) {
695        switch (e.code()) {
696          case CONNECTIONLOSS:
697          case OPERATIONTIMEOUT:
698          case REQUESTTIMEOUT:
699            retryOrThrow(retryCounter, e, "create");
700            break;
701
702          default:
703            throw e;
704        }
705      }
706      retryCounter.sleepUntilNextRetry();
707    }
708  }
709
710  /**
711   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op instances to
712   * actually pass to multi (need to do this in order to appendMetaData).
713   */
714  private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
715    if (ops == null) {
716      return null;
717    }
718
719    List<Op> preparedOps = new LinkedList<>();
720    for (Op op : ops) {
721      if (op.getType() == ZooDefs.OpCode.create) {
722        CreateRequest create = (CreateRequest) op.toRequestRecord();
723        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
724          create.getAcl(), create.getFlags()));
725      } else if (op.getType() == ZooDefs.OpCode.delete) {
726        // no need to appendMetaData for delete
727        preparedOps.add(op);
728      } else if (op.getType() == ZooDefs.OpCode.setData) {
729        SetDataRequest setData = (SetDataRequest) op.toRequestRecord();
730        preparedOps.add(Op.setData(setData.getPath(),
731          ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
732      } else {
733        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
734      }
735    }
736    return preparedOps;
737  }
738
739  /**
740   * Run multiple operations in a transactional manner. Retry before throwing exception
741   */
742  public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
743    final Span span = TraceUtil.createSpan("RecoverableZookeeper.multi");
744    try (Scope ignored = span.makeCurrent()) {
745      RetryCounter retryCounter = retryCounterFactory.create();
746      Iterable<Op> multiOps = prepareZKMulti(ops);
747      while (true) {
748        try {
749          long startTime = EnvironmentEdgeManager.currentTime();
750          List<OpResult> opResults = checkZk().multi(multiOps);
751          span.setStatus(StatusCode.OK);
752          return opResults;
753        } catch (KeeperException e) {
754          switch (e.code()) {
755            case CONNECTIONLOSS:
756            case OPERATIONTIMEOUT:
757            case REQUESTTIMEOUT:
758              TraceUtil.setError(span, e);
759              retryOrThrow(retryCounter, e, "multi");
760              break;
761
762            default:
763              TraceUtil.setError(span, e);
764              throw e;
765          }
766        }
767        retryCounter.sleepUntilNextRetry();
768      }
769    } finally {
770      span.end();
771    }
772  }
773
774  private String findPreviousSequentialNode(String path)
775    throws KeeperException, InterruptedException {
776    int lastSlashIdx = path.lastIndexOf('/');
777    assert (lastSlashIdx != -1);
778    String parent = path.substring(0, lastSlashIdx);
779    String nodePrefix = path.substring(lastSlashIdx + 1);
780    long startTime = EnvironmentEdgeManager.currentTime();
781    List<String> nodes = checkZk().getChildren(parent, false);
782    List<String> matching = filterByPrefix(nodes, nodePrefix);
783    for (String node : matching) {
784      String nodePath = parent + "/" + node;
785      startTime = EnvironmentEdgeManager.currentTime();
786      Stat stat = checkZk().exists(nodePath, false);
787      if (stat != null) {
788        return nodePath;
789      }
790    }
791    return null;
792  }
793
794  public synchronized long getSessionId() {
795    return zk == null ? -1 : zk.getSessionId();
796  }
797
798  public synchronized void close() throws InterruptedException {
799    if (zk != null) {
800      zk.close();
801    }
802  }
803
804  public synchronized States getState() {
805    return zk == null ? null : zk.getState();
806  }
807
808  public synchronized ZooKeeper getZooKeeper() {
809    return zk;
810  }
811
812  public synchronized byte[] getSessionPasswd() {
813    return zk == null ? null : zk.getSessionPasswd();
814  }
815
816  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
817    checkZk().sync(path, cb, ctx);
818  }
819
820  /**
821   * Filters the given node list by the given prefixes. This method is all-inclusive--if any element
822   * in the node list starts with any of the given prefixes, then it is included in the result.
823   * @param nodes    the nodes to filter
824   * @param prefixes the prefixes to include in the result
825   * @return list of every element that starts with one of the prefixes
826   */
827  private static List<String> filterByPrefix(List<String> nodes, String... prefixes) {
828    List<String> lockChildren = new ArrayList<>();
829    for (String child : nodes) {
830      for (String prefix : prefixes) {
831        if (child.startsWith(prefix)) {
832          lockChildren.add(child);
833          break;
834        }
835      }
836    }
837    return lockChildren;
838  }
839
840  public String getIdentifier() {
841    return identifier;
842  }
843}