001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.nio.file.Files; 022import java.nio.file.Path; 023import java.nio.file.attribute.BasicFileAttributes; 024import java.nio.file.attribute.FileTime; 025import java.time.Duration; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Instances of this class can be used to watch a file for changes. When a file's modification time 032 * changes, the callback provided by the user will be called from a background thread. Modification 033 * are detected by checking the file's attributes every polling interval. Some things to keep in 034 * mind: 035 * <ul> 036 * <li>The callback should be thread-safe.</li> 037 * <li>Changes that happen around the time the thread is started may be missed.</li> 038 * <li>There is a delay between a file changing and the callback firing.</li> 039 * </ul> 040 * <p/> 041 * This file was originally copied from the Apache ZooKeeper project, and then modified. 042 * @see <a href= 043 * "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base 044 * revision</a> 045 */ 046@InterfaceAudience.Private 047public final class FileChangeWatcher { 048 049 public interface FileChangeWatcherCallback { 050 void callback(Path path); 051 } 052 053 private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); 054 055 enum State { 056 NEW, // object created but start() not called yet 057 STARTING, // start() called but background thread has not entered main loop 058 RUNNING, // background thread is running 059 STOPPING, // stop() called but background thread has not exited main loop 060 STOPPED // stop() called and background thread has exited, or background thread crashed 061 } 062 063 private final WatcherThread watcherThread; 064 private State state; // protected by synchronized(this) 065 private FileTime lastModifiedTime; 066 private final Object lastModifiedTimeLock; 067 private final Path filePath; 068 private final Duration pollInterval; 069 070 /** 071 * Creates a watcher that watches <code>filePath</code> and invokes <code>callback</code> on 072 * changes. 073 * @param filePath the file to watch. 074 * @param callback the callback to invoke with events. <code>event.kind()</code> will return the 075 * type of event, and <code>event.context()</code> will return the filename 076 * relative to <code>dirPath</code>. 077 * @throws IOException if there is an error creating the WatchService. 078 */ 079 public FileChangeWatcher(Path filePath, String threadNameSuffix, Duration pollInterval, 080 FileChangeWatcherCallback callback) throws IOException { 081 this.filePath = filePath; 082 this.pollInterval = pollInterval; 083 084 state = State.NEW; 085 lastModifiedTimeLock = new Object(); 086 lastModifiedTime = Files.readAttributes(filePath, BasicFileAttributes.class).lastModifiedTime(); 087 this.watcherThread = new WatcherThread(threadNameSuffix, callback); 088 this.watcherThread.setDaemon(true); 089 } 090 091 /** 092 * Returns the current {@link FileChangeWatcher.State}. 093 * @return the current state. 094 */ 095 private synchronized State getState() { 096 return state; 097 } 098 099 /** 100 * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests, 101 * thus package-private. 102 * @param desiredState the desired state. 103 * @throws InterruptedException if the current thread gets interrupted. 104 */ 105 synchronized void waitForState(State desiredState) throws InterruptedException { 106 while (this.state != desiredState) { 107 this.wait(); 108 } 109 } 110 111 /** 112 * Sets the state to <code>newState</code>. 113 * @param newState the new state. 114 */ 115 private synchronized void setState(State newState) { 116 state = newState; 117 this.notifyAll(); 118 } 119 120 /** 121 * Atomically sets the state to <code>update</code> if and only if the state is currently 122 * <code>expected</code>. 123 * @param expected the expected state. 124 * @param update the new state. 125 * @return true if the update succeeds, or false if the current state does not equal 126 * <code>expected</code>. 127 */ 128 private synchronized boolean compareAndSetState(State expected, State update) { 129 if (state == expected) { 130 setState(update); 131 return true; 132 } else { 133 return false; 134 } 135 } 136 137 /** 138 * Atomically sets the state to <code>update</code> if and only if the state is currently one of 139 * <code>expectedStates</code>. 140 * @param expectedStates the expected states. 141 * @param update the new state. 142 * @return true if the update succeeds, or false if the current state does not equal any of the 143 * <code>expectedStates</code>. 144 */ 145 private synchronized boolean compareAndSetState(State[] expectedStates, State update) { 146 for (State expected : expectedStates) { 147 if (state == expected) { 148 setState(update); 149 return true; 150 } 151 } 152 return false; 153 } 154 155 /** 156 * Tells the background thread to start. Does not wait for it to be running. Calling this method 157 * more than once has no effect. 158 */ 159 public void start() { 160 if (!compareAndSetState(State.NEW, State.STARTING)) { 161 // If previous state was not NEW, start() has already been called. 162 return; 163 } 164 this.watcherThread.start(); 165 } 166 167 /** 168 * Tells the background thread to stop. Does not wait for it to exit. 169 */ 170 public void stop() { 171 if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { 172 watcherThread.interrupt(); 173 } 174 } 175 176 String getWatcherThreadName() { 177 return watcherThread.getName(); 178 } 179 180 private static void handleException(Thread thread, Throwable e) { 181 LOG.warn("Exception occurred from thread {}", thread.getName(), e); 182 } 183 184 /** 185 * Inner class that implements the watcher thread logic. 186 */ 187 private class WatcherThread extends Thread { 188 189 private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-"; 190 191 final FileChangeWatcherCallback callback; 192 193 WatcherThread(String threadNameSuffix, FileChangeWatcherCallback callback) { 194 super(THREAD_NAME_PREFIX + threadNameSuffix); 195 this.callback = callback; 196 setUncaughtExceptionHandler(FileChangeWatcher::handleException); 197 } 198 199 @Override 200 public void run() { 201 try { 202 LOG.debug("{} thread started", getName()); 203 if ( 204 !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING) 205 ) { 206 // stop() called shortly after start(), before 207 // this thread started running. 208 FileChangeWatcher.State state = FileChangeWatcher.this.getState(); 209 if (state != FileChangeWatcher.State.STOPPING) { 210 throw new IllegalStateException("Unexpected state: " + state); 211 } 212 return; 213 } 214 runLoop(); 215 } catch (Exception e) { 216 LOG.warn("Error in runLoop()", e); 217 throw new RuntimeException(e); 218 } finally { 219 LOG.debug("{} thread finished", getName()); 220 FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); 221 } 222 } 223 224 private void runLoop() throws IOException { 225 while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { 226 BasicFileAttributes attributes = Files.readAttributes(filePath, BasicFileAttributes.class); 227 boolean modified = false; 228 synchronized (lastModifiedTimeLock) { 229 FileTime maybeNewLastModifiedTime = attributes.lastModifiedTime(); 230 if (!lastModifiedTime.equals(maybeNewLastModifiedTime)) { 231 modified = true; 232 lastModifiedTime = maybeNewLastModifiedTime; 233 } 234 } 235 236 // avoid calling callback while holding lock 237 if (modified) { 238 try { 239 callback.callback(filePath); 240 } catch (Throwable e) { 241 LOG.error("Error from callback", e); 242 } 243 } 244 245 try { 246 Thread.sleep(pollInterval.toMillis()); 247 } catch (InterruptedException e) { 248 LOG.debug("Interrupted", e); 249 Thread.currentThread().interrupt(); 250 return; 251 } 252 } 253 } 254 } 255}