001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.spy; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Iterator; 030import java.util.List; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.ChoreService; 037import org.apache.hadoop.hbase.CoordinatedStateManager; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.ZooKeeperConnectionException; 044import org.apache.hadoop.hbase.client.ClusterConnection; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.master.HMaster; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationFactory; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 050import org.apache.hadoop.hbase.replication.ReplicationPeers; 051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; 054import org.apache.hadoop.hbase.testclassification.MasterTests; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 059import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 060import org.apache.zookeeper.KeeperException; 061import org.apache.zookeeper.data.Stat; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073 074@Category({ MasterTests.class, SmallTests.class }) 075public class TestReplicationHFileCleaner { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestReplicationHFileCleaner.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); 082 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 private static Server server; 084 private static ReplicationQueueStorage rq; 085 private static ReplicationPeers rp; 086 private static final String peerId = "TestReplicationHFileCleaner"; 087 private static Configuration conf = TEST_UTIL.getConfiguration(); 088 static FileSystem fs = null; 089 Path root; 090 091 @BeforeClass 092 public static void setUpBeforeClass() throws Exception { 093 TEST_UTIL.startMiniZKCluster(); 094 server = new DummyServer(); 095 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 096 HMaster.decorateMasterConfiguration(conf); 097 rp = 098 ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf); 099 rp.init(); 100 rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 101 fs = FileSystem.get(conf); 102 } 103 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniZKCluster(); 107 } 108 109 @Before 110 public void setup() throws ReplicationException, IOException { 111 root = TEST_UTIL.getDataTestDirOnTestFS(); 112 rp.getPeerStorage().addPeer(peerId, 113 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); 114 rq.addPeerToHFileRefs(peerId); 115 } 116 117 @After 118 public void cleanup() throws ReplicationException { 119 try { 120 fs.delete(root, true); 121 } catch (IOException e) { 122 LOG.warn("Failed to delete files recursively from path " + root); 123 } 124 // Remove all HFileRefs (if any) 125 rq.removeHFileRefs(peerId, rq.getReplicableHFiles(peerId)); 126 rp.getPeerStorage().removePeer(peerId); 127 } 128 129 @Test 130 public void testIsFileDeletable() throws IOException, ReplicationException { 131 // 1. Create a file 132 Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); 133 fs.createNewFile(file); 134 // 2. Assert file is successfully created 135 assertTrue("Test file not created!", fs.exists(file)); 136 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 137 cleaner.setConf(conf); 138 // 3. Assert that file as is should be deletable 139 assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " 140 + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); 141 142 List<Pair<Path, Path>> files = new ArrayList<>(1); 143 files.add(new Pair<>(null, file)); 144 // 4. Add the file to hfile-refs queue 145 rq.addHFileRefs(peerId, files); 146 // 5. Assert file should not be deletable 147 assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " 148 + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); 149 } 150 151 @Test 152 public void testGetDeletableFiles() throws Exception { 153 // 1. Create two files and assert that they do not exist 154 Path notDeletablefile = new Path(root, "testGetDeletableFiles_1"); 155 fs.createNewFile(notDeletablefile); 156 assertTrue("Test file not created!", fs.exists(notDeletablefile)); 157 Path deletablefile = new Path(root, "testGetDeletableFiles_2"); 158 fs.createNewFile(deletablefile); 159 assertTrue("Test file not created!", fs.exists(deletablefile)); 160 161 List<FileStatus> files = new ArrayList<>(2); 162 FileStatus f = new FileStatus(); 163 f.setPath(deletablefile); 164 files.add(f); 165 f = new FileStatus(); 166 f.setPath(notDeletablefile); 167 files.add(f); 168 169 List<Pair<Path, Path>> hfiles = new ArrayList<>(1); 170 hfiles.add(new Pair<>(null, notDeletablefile)); 171 // 2. Add one file to hfile-refs queue 172 rq.addHFileRefs(peerId, hfiles); 173 174 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 175 cleaner.setConf(conf); 176 Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); 177 int i = 0; 178 while (deletableFilesIterator.hasNext() && i < 2) { 179 i++; 180 } 181 // 5. Assert one file should not be deletable and it is present in the list returned 182 if (i > 2) { 183 fail("File " + notDeletablefile 184 + " should not be deletable as its hfile reference node is not added."); 185 } 186 assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); 187 } 188 189 /** 190 * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting. 191 */ 192 @Test 193 public void testZooKeeperAbort() throws Exception { 194 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 195 196 List<FileStatus> dummyFiles = Lists.newArrayList( 197 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("hfile1")), 198 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("hfile2"))); 199 200 FaultyZooKeeperWatcher faultyZK = 201 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); 202 try { 203 faultyZK.init(); 204 cleaner.setConf(conf, faultyZK); 205 // should keep all files due to a ConnectionLossException getting the queues znodes 206 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 207 assertFalse(toDelete.iterator().hasNext()); 208 assertFalse(cleaner.isStopped()); 209 } finally { 210 faultyZK.close(); 211 } 212 213 // when zk is working both files should be returned 214 cleaner = new ReplicationHFileCleaner(); 215 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 216 try { 217 cleaner.setConf(conf, zkw); 218 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 219 Iterator<FileStatus> iter = filesToDelete.iterator(); 220 assertTrue(iter.hasNext()); 221 assertEquals(new Path("hfile1"), iter.next().getPath()); 222 assertTrue(iter.hasNext()); 223 assertEquals(new Path("hfile2"), iter.next().getPath()); 224 assertFalse(iter.hasNext()); 225 } finally { 226 zkw.close(); 227 } 228 } 229 230 static class DummyServer implements Server { 231 232 @Override 233 public Configuration getConfiguration() { 234 return TEST_UTIL.getConfiguration(); 235 } 236 237 @Override 238 public ZKWatcher getZooKeeper() { 239 try { 240 return new ZKWatcher(getConfiguration(), "dummy server", this); 241 } catch (IOException e) { 242 LOG.error("Can not get ZKWatcher", e); 243 } 244 return null; 245 } 246 247 @Override 248 public FileSystem getFileSystem() { 249 try { 250 return TEST_UTIL.getTestFileSystem(); 251 } catch (IOException e) { 252 LOG.error("Can not get FileSystem", e); 253 } 254 return null; 255 } 256 257 @Override 258 public CoordinatedStateManager getCoordinatedStateManager() { 259 return null; 260 } 261 262 @Override 263 public ClusterConnection getConnection() { 264 return null; 265 } 266 267 @Override 268 public ServerName getServerName() { 269 return ServerName.valueOf("regionserver,60020,000000"); 270 } 271 272 @Override 273 public void abort(String why, Throwable e) { 274 } 275 276 @Override 277 public boolean isAborted() { 278 return false; 279 } 280 281 @Override 282 public void stop(String why) { 283 } 284 285 @Override 286 public boolean isStopped() { 287 return false; 288 } 289 290 @Override 291 public ChoreService getChoreService() { 292 return null; 293 } 294 295 @Override 296 public ClusterConnection getClusterConnection() { 297 return null; 298 } 299 300 @Override 301 public boolean isStopping() { 302 return false; 303 } 304 305 @Override 306 public Connection createConnection(Configuration conf) throws IOException { 307 return null; 308 } 309 } 310 311 static class FaultyZooKeeperWatcher extends ZKWatcher { 312 private RecoverableZooKeeper zk; 313 314 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 315 throws ZooKeeperConnectionException, IOException { 316 super(conf, identifier, abortable); 317 } 318 319 public void init() throws Exception { 320 this.zk = spy(super.getRecoverableZooKeeper()); 321 doThrow(new KeeperException.ConnectionLossException()).when(zk) 322 .getData("/hbase/replication/hfile-refs", null, new Stat()); 323 } 324 325 @Override 326 public RecoverableZooKeeper getRecoverableZooKeeper() { 327 return zk; 328 } 329 } 330}