001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.nio.charset.StandardCharsets; 030import java.nio.file.StandardWatchEventKinds; 031import java.nio.file.WatchEvent; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.commons.io.FileUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 041import org.apache.hadoop.hbase.testclassification.IOTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.hamcrest.Matchers; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * This file has been copied from the Apache ZooKeeper project. 054 * @see <a href= 055 * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base 056 * revision</a> 057 */ 058@Category({ IOTests.class, SmallTests.class }) 059public class TestFileChangeWatcher { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestFileChangeWatcher.class); 064 065 private static File tempDir; 066 private static File tempFile; 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); 069 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 070 071 private static final long FS_TIMEOUT = 30000L; 072 073 @BeforeClass 074 public static void createTempFile() throws IOException { 075 tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString()) 076 .getCanonicalFile(); 077 FileUtils.forceMkdir(tempDir); 078 tempFile = File.createTempFile("zk_test_", "", tempDir); 079 } 080 081 @AfterClass 082 public static void cleanupTempDir() { 083 UTIL.cleanupTestDir(); 084 } 085 086 @Test 087 public void testEnableCertFileReloading() throws IOException { 088 Configuration myConf = new Configuration(); 089 String sharedPath = "/tmp/foo.jks"; 090 myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath); 091 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath); 092 AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>(); 093 AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>(); 094 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 095 }); 096 assertNotNull(keystoreWatcher.get()); 097 assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks")); 098 assertNull(truststoreWatcher.get()); 099 100 keystoreWatcher.getAndSet(null).stop(); 101 truststoreWatcher.set(null); 102 103 String truststorePath = "/tmp/bar.jks"; 104 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath); 105 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 106 }); 107 108 assertNotNull(keystoreWatcher.get()); 109 assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks")); 110 assertNotNull(truststoreWatcher.get()); 111 assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-bar.jks")); 112 113 keystoreWatcher.getAndSet(null).stop(); 114 truststoreWatcher.getAndSet(null).stop(); 115 } 116 117 @Test 118 public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { 119 FileChangeWatcher watcher = null; 120 try { 121 final List<WatchEvent<?>> events = new ArrayList<>(); 122 watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { 123 LOG.info("Got an update: {} {}", event.kind(), event.context()); 124 // Filter out the extra ENTRY_CREATE events that are 125 // sometimes seen at the start. Even though we create the watcher 126 // after the file exists, sometimes we still get a create event. 127 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 128 return; 129 } 130 synchronized (events) { 131 events.add(event); 132 events.notifyAll(); 133 } 134 }); 135 watcher.start(); 136 watcher.waitForState(FileChangeWatcher.State.RUNNING); 137 Thread.sleep(1000L); // TODO hack 138 for (int i = 0; i < 3; i++) { 139 LOG.info("Modifying file, attempt {}", (i + 1)); 140 FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, 141 true); 142 synchronized (events) { 143 if (events.size() < i + 1) { 144 events.wait(FS_TIMEOUT); 145 } 146 assertEquals("Wrong number of events", i + 1, events.size()); 147 WatchEvent<?> event = events.get(i); 148 assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); 149 assertEquals(tempFile.getName(), event.context().toString()); 150 } 151 } 152 } finally { 153 if (watcher != null) { 154 watcher.stop(); 155 watcher.waitForState(FileChangeWatcher.State.STOPPED); 156 } 157 } 158 } 159 160 @Test 161 public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { 162 FileChangeWatcher watcher = null; 163 try { 164 final List<WatchEvent<?>> events = new ArrayList<>(); 165 watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { 166 LOG.info("Got an update: {} {}", event.kind(), event.context()); 167 // Filter out the extra ENTRY_CREATE events that are 168 // sometimes seen at the start. Even though we create the watcher 169 // after the file exists, sometimes we still get a create event. 170 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 171 return; 172 } 173 synchronized (events) { 174 events.add(event); 175 events.notifyAll(); 176 } 177 }); 178 watcher.start(); 179 watcher.waitForState(FileChangeWatcher.State.RUNNING); 180 Thread.sleep(1000L); // TODO hack 181 LOG.info("Touching file"); 182 FileUtils.touch(tempFile); 183 synchronized (events) { 184 if (events.isEmpty()) { 185 events.wait(FS_TIMEOUT); 186 } 187 assertFalse(events.isEmpty()); 188 WatchEvent<?> event = events.get(0); 189 assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); 190 assertEquals(tempFile.getName(), event.context().toString()); 191 } 192 } finally { 193 if (watcher != null) { 194 watcher.stop(); 195 watcher.waitForState(FileChangeWatcher.State.STOPPED); 196 } 197 } 198 } 199 200 @Test 201 public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { 202 FileChangeWatcher watcher = null; 203 try { 204 final List<WatchEvent<?>> events = new ArrayList<>(); 205 watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { 206 LOG.info("Got an update: {} {}", event.kind(), event.context()); 207 synchronized (events) { 208 events.add(event); 209 events.notifyAll(); 210 } 211 }); 212 watcher.start(); 213 watcher.waitForState(FileChangeWatcher.State.RUNNING); 214 Thread.sleep(1000L); // TODO hack 215 File tempFile2 = File.createTempFile("zk_test_", "", tempDir); 216 tempFile2.deleteOnExit(); 217 synchronized (events) { 218 if (events.isEmpty()) { 219 events.wait(FS_TIMEOUT); 220 } 221 assertFalse(events.isEmpty()); 222 WatchEvent<?> event = events.get(0); 223 assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); 224 assertEquals(tempFile2.getName(), event.context().toString()); 225 } 226 } finally { 227 if (watcher != null) { 228 watcher.stop(); 229 watcher.waitForState(FileChangeWatcher.State.STOPPED); 230 } 231 } 232 } 233 234 @Test 235 public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { 236 FileChangeWatcher watcher = null; 237 try { 238 final List<WatchEvent<?>> events = new ArrayList<>(); 239 watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { 240 LOG.info("Got an update: {} {}", event.kind(), event.context()); 241 // Filter out the extra ENTRY_CREATE events that are 242 // sometimes seen at the start. Even though we create the watcher 243 // after the file exists, sometimes we still get a create event. 244 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 245 return; 246 } 247 synchronized (events) { 248 events.add(event); 249 events.notifyAll(); 250 } 251 }); 252 watcher.start(); 253 watcher.waitForState(FileChangeWatcher.State.RUNNING); 254 Thread.sleep(1000L); // TODO hack 255 tempFile.delete(); 256 synchronized (events) { 257 if (events.isEmpty()) { 258 events.wait(FS_TIMEOUT); 259 } 260 assertFalse(events.isEmpty()); 261 WatchEvent<?> event = events.get(0); 262 assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); 263 assertEquals(tempFile.getName(), event.context().toString()); 264 } 265 } finally { 266 if (watcher != null) { 267 watcher.stop(); 268 watcher.waitForState(FileChangeWatcher.State.STOPPED); 269 } 270 } 271 } 272 273 @Test 274 public void testCallbackErrorDoesNotCrashWatcherThread() 275 throws IOException, InterruptedException { 276 FileChangeWatcher watcher = null; 277 try { 278 final AtomicInteger callCount = new AtomicInteger(0); 279 watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> { 280 LOG.info("Got an update: {} {}", event.kind(), event.context()); 281 int oldValue; 282 synchronized (callCount) { 283 oldValue = callCount.getAndIncrement(); 284 callCount.notifyAll(); 285 } 286 if (oldValue == 0) { 287 throw new RuntimeException("This error should not crash the watcher thread"); 288 } 289 }); 290 watcher.start(); 291 watcher.waitForState(FileChangeWatcher.State.RUNNING); 292 Thread.sleep(1000L); // TODO hack 293 LOG.info("Modifying file"); 294 FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); 295 synchronized (callCount) { 296 while (callCount.get() == 0) { 297 callCount.wait(FS_TIMEOUT); 298 } 299 } 300 LOG.info("Modifying file again"); 301 FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); 302 synchronized (callCount) { 303 if (callCount.get() == 1) { 304 callCount.wait(FS_TIMEOUT); 305 } 306 } 307 // The value of callCount can exceed 1 only if the callback thread 308 // survives the exception thrown by the first callback. 309 assertTrue(callCount.get() > 1); 310 } finally { 311 if (watcher != null) { 312 watcher.stop(); 313 watcher.waitForState(FileChangeWatcher.State.STOPPED); 314 } 315 } 316 } 317}