001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.List;
023import org.apache.hadoop.hbase.zookeeper.ZKListener;
024import org.apache.hadoop.hbase.zookeeper.ZKUtil;
025import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
026import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.zookeeper.KeeperException;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode
034 * operations should go through the provided methods in coordinators and members. Layout of nodes in
035 * ZK is /hbase/[op name]/acquired/ [op instance] - op data/ /[nodes that have acquired] /reached/
036 * [op instance]/ /[nodes that have completed] /abort/ [op instance] - failure data NOTE: while
037 * acquired and completed are znode dirs, abort is actually just a znode. Assumption here that
038 * procedure names are unique
039 */
040@InterfaceAudience.Private
041public abstract class ZKProcedureUtil extends ZKListener implements Closeable {
042
043  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureUtil.class);
044
045  public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
046  public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
047  public static final String ABORT_ZNODE_DEFAULT = "abort";
048
049  public final String baseZNode;
050  protected final String acquiredZnode;
051  protected final String reachedZnode;
052  protected final String abortZnode;
053
054  /**
055   * Top-level watcher/controller for procedures across the cluster.
056   * <p>
057   * On instantiation, this ensures the procedure znodes exist. This however requires the passed in
058   * watcher has been started.
059   * @param watcher         watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
060   *                        {@link #close()}
061   * @param procDescription name of the znode describing the procedure to run
062   * @throws KeeperException when the procedure znodes cannot be created
063   */
064  public ZKProcedureUtil(ZKWatcher watcher, String procDescription) throws KeeperException {
065    super(watcher);
066    // make sure we are listening for events
067    watcher.registerListener(this);
068    // setup paths for the zknodes used in procedures
069    this.baseZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, procDescription);
070    acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
071    reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
072    abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
073
074    // first make sure all the ZK nodes exist
075    // make sure all the parents exist (sometimes not the case in tests)
076    ZKUtil.createWithParents(watcher, acquiredZnode);
077    // regular create because all the parents exist
078    ZKUtil.createAndFailSilent(watcher, reachedZnode);
079    ZKUtil.createAndFailSilent(watcher, abortZnode);
080  }
081
082  @Override
083  public void close() throws IOException {
084    // the watcher is passed from either Master or Region Server
085    // watcher.close() will be called by the owner so no need to call close() here
086  }
087
088  public String getAcquiredBarrierNode(String opInstanceName) {
089    return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
090  }
091
092  public String getReachedBarrierNode(String opInstanceName) {
093    return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
094  }
095
096  public String getAbortZNode(String opInstanceName) {
097    return ZKProcedureUtil.getAbortNode(this, opInstanceName);
098  }
099
100  public String getAbortZnode() {
101    return abortZnode;
102  }
103
104  public String getBaseZnode() {
105    return baseZNode;
106  }
107
108  public String getAcquiredBarrier() {
109    return acquiredZnode;
110  }
111
112  /**
113   * Get the full znode path for the node used by the coordinator to trigger a global barrier
114   * acquire on each subprocedure.
115   * @param controller     controller running the procedure
116   * @param opInstanceName name of the running procedure instance (not the procedure description).
117   * @return full znode path to the prepare barrier/start node
118   */
119  public static String getAcquireBarrierNode(ZKProcedureUtil controller, String opInstanceName) {
120    return ZNodePaths.joinZNode(controller.acquiredZnode, opInstanceName);
121  }
122
123  /**
124   * Get the full znode path for the node used by the coordinator to trigger a global barrier
125   * execution and release on each subprocedure.
126   * @param controller     controller running the procedure
127   * @param opInstanceName name of the running procedure instance (not the procedure description).
128   * @return full znode path to the commit barrier
129   */
130  public static String getReachedBarrierNode(ZKProcedureUtil controller, String opInstanceName) {
131    return ZNodePaths.joinZNode(controller.reachedZnode, opInstanceName);
132  }
133
134  /**
135   * Get the full znode path for the node used by the coordinator or member to trigger an abort of
136   * the global barrier acquisition or execution in subprocedures.
137   * @param controller     controller running the procedure
138   * @param opInstanceName name of the running procedure instance (not the procedure description).
139   * @return full znode path to the abort znode
140   */
141  public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
142    return ZNodePaths.joinZNode(controller.abortZnode, opInstanceName);
143  }
144
145  @Override
146  public ZKWatcher getWatcher() {
147    return watcher;
148  }
149
150  /**
151   * Is this a procedure related znode path? TODO: this is not strict, can return true if had name
152   * just starts with same prefix but is different zdir.
153   * @return true if starts with baseZnode
154   */
155  boolean isInProcedurePath(String path) {
156    return path.startsWith(baseZNode);
157  }
158
159  /**
160   * Is this the exact procedure barrier acquired znode
161   */
162  boolean isAcquiredNode(String path) {
163    return path.equals(acquiredZnode);
164  }
165
166  /**
167   * Is this in the procedure barrier acquired znode path
168   */
169  boolean isAcquiredPathNode(String path) {
170    return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode)
171      && isMemberNode(path, acquiredZnode);
172  }
173
174  /**
175   * Is this the exact procedure barrier reached znode
176   */
177  boolean isReachedNode(String path) {
178    return path.equals(reachedZnode);
179  }
180
181  /**
182   * Is this in the procedure barrier reached znode path
183   */
184  boolean isReachedPathNode(String path) {
185    return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode)
186      && isMemberNode(path, reachedZnode);
187  }
188
189  /*
190   * Returns true if the specified path is a member of the "statePath"
191   * /hbase/<ProcName>/<state>/<instance>/member |------ state path -----| |------------------ path
192   * ------------------|
193   */
194  private boolean isMemberNode(final String path, final String statePath) {
195    int count = 0;
196    for (int i = statePath.length(); i < path.length(); ++i) {
197      count += (path.charAt(i) == ZNodePaths.ZNODE_PATH_SEPARATOR) ? 1 : 0;
198    }
199    return count == 2;
200  }
201
202  /**
203   * Is this in the procedure barrier abort znode path
204   */
205  boolean isAbortNode(String path) {
206    return path.equals(abortZnode);
207  }
208
209  /**
210   * Is this in the procedure barrier abort znode path
211   */
212  public boolean isAbortPathNode(String path) {
213    return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
214  }
215
216  // --------------------------------------------------------------------------
217  // internal debugging methods
218  // --------------------------------------------------------------------------
219  /**
220   * Recursively print the current state of ZK (non-transactional)
221   * @param root name of the root directory in zk to print
222   */
223  void logZKTree(String root) {
224    if (!LOG.isDebugEnabled()) return;
225    LOG.debug("Current zk system:");
226    String prefix = "|-";
227    LOG.debug(prefix + root);
228    try {
229      logZKTree(root, prefix);
230    } catch (KeeperException e) {
231      throw new RuntimeException(e);
232    }
233  }
234
235  /**
236   * Helper method to print the current state of the ZK tree.
237   * @see #logZKTree(String)
238   * @throws KeeperException if an unexpected exception occurs
239   */
240  protected void logZKTree(String root, String prefix) throws KeeperException {
241    List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
242    if (children == null) return;
243    for (String child : children) {
244      LOG.debug(prefix + child);
245      String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child);
246      logZKTree(node, prefix + "---");
247    }
248  }
249
250  public void clearChildZNodes() throws KeeperException {
251    LOG.debug("Clearing all znodes {}, {}, {}", acquiredZnode, reachedZnode, abortZnode);
252
253    // If the coordinator was shutdown mid-procedure, then we are going to lose
254    // an procedure that was previously started by cleaning out all the previous state. Its much
255    // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
256    ZKUtil.deleteChildrenRecursivelyMultiOrSequential(watcher, true, acquiredZnode, reachedZnode,
257      abortZnode);
258
259    if (LOG.isTraceEnabled()) {
260      logZKTree(this.baseZNode);
261    }
262  }
263
264  public void clearZNodes(String procedureName) throws KeeperException {
265    LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
266      + acquiredZnode + " " + reachedZnode + " " + abortZnode);
267
268    // Make sure we trigger the watches on these nodes by creating them. (HBASE-13885)
269    String acquiredBarrierNode = getAcquiredBarrierNode(procedureName);
270    String reachedBarrierNode = getReachedBarrierNode(procedureName);
271    String abortZNode = getAbortZNode(procedureName);
272
273    ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode);
274    ZKUtil.createAndFailSilent(watcher, abortZNode);
275
276    ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode,
277      reachedBarrierNode, abortZNode);
278
279    if (LOG.isTraceEnabled()) {
280      logZKTree(this.baseZNode);
281    }
282  }
283}