001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.nio.file.Files;
022import java.nio.file.Path;
023import java.nio.file.attribute.BasicFileAttributes;
024import java.nio.file.attribute.FileTime;
025import java.time.Duration;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Instances of this class can be used to watch a file for changes. When a file's modification time
032 * changes, the callback provided by the user will be called from a background thread. Modification
033 * are detected by checking the file's attributes every polling interval. Some things to keep in
034 * mind:
035 * <ul>
036 * <li>The callback should be thread-safe.</li>
037 * <li>Changes that happen around the time the thread is started may be missed.</li>
038 * <li>There is a delay between a file changing and the callback firing.</li>
039 * </ul>
040 * <p/>
041 * This file was originally copied from the Apache ZooKeeper project, and then modified.
042 * @see <a href=
043 *      "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
044 *      revision</a>
045 */
046@InterfaceAudience.Private
047public final class FileChangeWatcher {
048
049  public interface FileChangeWatcherCallback {
050    void callback(Path path);
051  }
052
053  private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
054
055  enum State {
056    NEW, // object created but start() not called yet
057    STARTING, // start() called but background thread has not entered main loop
058    RUNNING, // background thread is running
059    STOPPING, // stop() called but background thread has not exited main loop
060    STOPPED // stop() called and background thread has exited, or background thread crashed
061  }
062
063  private final WatcherThread watcherThread;
064  private State state; // protected by synchronized(this)
065  private FileTime lastModifiedTime;
066  private final Object lastModifiedTimeLock;
067  private final Path filePath;
068  private final Duration pollInterval;
069
070  /**
071   * Creates a watcher that watches <code>filePath</code> and invokes <code>callback</code> on
072   * changes.
073   * @param filePath the file to watch.
074   * @param callback the callback to invoke with events. <code>event.kind()</code> will return the
075   *                 type of event, and <code>event.context()</code> will return the filename
076   *                 relative to <code>dirPath</code>.
077   * @throws IOException if there is an error creating the WatchService.
078   */
079  public FileChangeWatcher(Path filePath, String threadNameSuffix, Duration pollInterval,
080    FileChangeWatcherCallback callback) throws IOException {
081    this.filePath = filePath;
082    this.pollInterval = pollInterval;
083
084    state = State.NEW;
085    lastModifiedTimeLock = new Object();
086    lastModifiedTime = Files.readAttributes(filePath, BasicFileAttributes.class).lastModifiedTime();
087    this.watcherThread = new WatcherThread(threadNameSuffix, callback);
088    this.watcherThread.setDaemon(true);
089  }
090
091  /**
092   * Returns the current {@link FileChangeWatcher.State}.
093   * @return the current state.
094   */
095  private synchronized State getState() {
096    return state;
097  }
098
099  /**
100   * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
101   * thus package-private.
102   * @param desiredState the desired state.
103   * @throws InterruptedException if the current thread gets interrupted.
104   */
105  synchronized void waitForState(State desiredState) throws InterruptedException {
106    while (this.state != desiredState) {
107      this.wait();
108    }
109  }
110
111  /**
112   * Sets the state to <code>newState</code>.
113   * @param newState the new state.
114   */
115  private synchronized void setState(State newState) {
116    state = newState;
117    this.notifyAll();
118  }
119
120  /**
121   * Atomically sets the state to <code>update</code> if and only if the state is currently
122   * <code>expected</code>.
123   * @param expected the expected state.
124   * @param update   the new state.
125   * @return true if the update succeeds, or false if the current state does not equal
126   *         <code>expected</code>.
127   */
128  private synchronized boolean compareAndSetState(State expected, State update) {
129    if (state == expected) {
130      setState(update);
131      return true;
132    } else {
133      return false;
134    }
135  }
136
137  /**
138   * Atomically sets the state to <code>update</code> if and only if the state is currently one of
139   * <code>expectedStates</code>.
140   * @param expectedStates the expected states.
141   * @param update         the new state.
142   * @return true if the update succeeds, or false if the current state does not equal any of the
143   *         <code>expectedStates</code>.
144   */
145  private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
146    for (State expected : expectedStates) {
147      if (state == expected) {
148        setState(update);
149        return true;
150      }
151    }
152    return false;
153  }
154
155  /**
156   * Tells the background thread to start. Does not wait for it to be running. Calling this method
157   * more than once has no effect.
158   */
159  public void start() {
160    if (!compareAndSetState(State.NEW, State.STARTING)) {
161      // If previous state was not NEW, start() has already been called.
162      return;
163    }
164    this.watcherThread.start();
165  }
166
167  /**
168   * Tells the background thread to stop. Does not wait for it to exit.
169   */
170  public void stop() {
171    if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
172      watcherThread.interrupt();
173    }
174  }
175
176  String getWatcherThreadName() {
177    return watcherThread.getName();
178  }
179
180  private static void handleException(Thread thread, Throwable e) {
181    LOG.warn("Exception occurred from thread {}", thread.getName(), e);
182  }
183
184  /**
185   * Inner class that implements the watcher thread logic.
186   */
187  private class WatcherThread extends Thread {
188
189    private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
190
191    final FileChangeWatcherCallback callback;
192
193    WatcherThread(String threadNameSuffix, FileChangeWatcherCallback callback) {
194      super(THREAD_NAME_PREFIX + threadNameSuffix);
195      this.callback = callback;
196      setUncaughtExceptionHandler(FileChangeWatcher::handleException);
197    }
198
199    @Override
200    public void run() {
201      try {
202        LOG.debug("{} thread started", getName());
203        if (
204          !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
205        ) {
206          // stop() called shortly after start(), before
207          // this thread started running.
208          FileChangeWatcher.State state = FileChangeWatcher.this.getState();
209          if (state != FileChangeWatcher.State.STOPPING) {
210            throw new IllegalStateException("Unexpected state: " + state);
211          }
212          return;
213        }
214        runLoop();
215      } catch (Exception e) {
216        LOG.warn("Error in runLoop()", e);
217        throw new RuntimeException(e);
218      } finally {
219        LOG.debug("{} thread finished", getName());
220        FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
221      }
222    }
223
224    private void runLoop() throws IOException {
225      while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
226        BasicFileAttributes attributes = Files.readAttributes(filePath, BasicFileAttributes.class);
227        boolean modified = false;
228        synchronized (lastModifiedTimeLock) {
229          FileTime maybeNewLastModifiedTime = attributes.lastModifiedTime();
230          if (!lastModifiedTime.equals(maybeNewLastModifiedTime)) {
231            modified = true;
232            lastModifiedTime = maybeNewLastModifiedTime;
233          }
234        }
235
236        // avoid calling callback while holding lock
237        if (modified) {
238          try {
239            callback.callback(filePath);
240          } catch (Throwable e) {
241            LOG.error("Error from callback", e);
242          }
243        }
244
245        try {
246          Thread.sleep(pollInterval.toMillis());
247        } catch (InterruptedException e) {
248          LOG.debug("Interrupted", e);
249          Thread.currentThread().interrupt();
250          return;
251        }
252      }
253    }
254  }
255}