001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.net.URLEncoder; 026import java.nio.charset.StandardCharsets; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.commons.io.FileUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.TableNameTestRule; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.master.HMaster; 046import org.apache.hadoop.hbase.master.MasterServices; 047import org.apache.hadoop.hbase.master.ServerManager; 048import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 051import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 053import org.apache.hadoop.hbase.replication.ReplicationQueueId; 054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 056import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; 057import org.apache.hadoop.hbase.testclassification.MasterTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.MockServer; 062import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 074 075@Category({ MasterTests.class, MediumTests.class }) 076public class TestLogsCleaner { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestLogsCleaner.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 083 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 084 085 private static final Path OLD_WALS_DIR = 086 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 087 088 private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs"); 089 090 private static Configuration conf; 091 092 private static DirScanPool POOL; 093 094 private static String peerId = "1"; 095 096 private MasterServices masterServices; 097 098 private ReplicationQueueStorage queueStorage; 099 100 @Rule 101 public final TableNameTestRule tableNameRule = new TableNameTestRule(); 102 103 @BeforeClass 104 public static void setUpBeforeClass() throws Exception { 105 TEST_UTIL.startMiniCluster(); 106 POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); 107 } 108 109 @AfterClass 110 public static void tearDownAfterClass() throws Exception { 111 TEST_UTIL.shutdownMiniCluster(); 112 POOL.shutdownNow(); 113 } 114 115 @Before 116 public void beforeTest() throws Exception { 117 conf = TEST_UTIL.getConfiguration(); 118 119 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 120 121 fs.delete(OLD_WALS_DIR, true); 122 123 // root directory 124 fs.mkdirs(OLD_WALS_DIR); 125 126 TableName tableName = tableNameRule.getTableName(); 127 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 128 TEST_UTIL.getAdmin().createTable(td); 129 TEST_UTIL.waitTableAvailable(tableName); 130 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), 131 conf, tableName); 132 133 masterServices = mock(MasterServices.class); 134 when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); 135 when(masterServices.getReplicationLogCleanerBarrier()) 136 .thenReturn(new ReplicationLogCleanerBarrier()); 137 ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); 138 when(masterServices.getReplicationPeerManager()).thenReturn(rpm); 139 when(rpm.getQueueStorage()).thenReturn(queueStorage); 140 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 141 ServerManager sm = mock(ServerManager.class); 142 when(masterServices.getServerManager()).thenReturn(sm); 143 when(sm.getOnlineServersList()).thenReturn(Collections.emptyList()); 144 @SuppressWarnings("unchecked") 145 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 146 when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec); 147 when(procExec.getProcedures()).thenReturn(Collections.emptyList()); 148 } 149 150 /** 151 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same 152 * oldWALs directory. 153 * <p/> 154 * Created files: 155 * <ul> 156 * <li>2 invalid files</li> 157 * <li>5 old Procedure WALs</li> 158 * <li>30 old WALs from which 3 are in replication</li> 159 * <li>5 recent Procedure WALs</li> 160 * <li>1 recent WAL</li> 161 * <li>1 very new WAL (timestamp in future)</li> 162 * <li>masterProcedureWALs subdirectory</li> 163 * </ul> 164 * Files which should stay: 165 * <ul> 166 * <li>3 replication WALs</li> 167 * <li>2 new WALs</li> 168 * <li>5 latest Procedure WALs</li> 169 * <li>masterProcedureWALs subdirectory</li> 170 * </ul> 171 */ 172 @Test 173 public void testLogCleaning() throws Exception { 174 // set TTLs 175 long ttlWAL = 2000; 176 long ttlProcedureWAL = 4000; 177 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 178 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 179 180 HMaster.decorateMasterConfiguration(conf); 181 Server server = new DummyServer(); 182 String fakeMachineName = 183 URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); 184 185 final FileSystem fs = FileSystem.get(conf); 186 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 187 188 final long now = EnvironmentEdgeManager.currentTime(); 189 190 // Case 1: 2 invalid files, which would be deleted directly 191 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 192 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 193 194 // Case 2: 5 Procedure WALs that are old which would be deleted 195 for (int i = 1; i <= 5; i++) { 196 final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 197 fs.createNewFile(fileName); 198 } 199 200 // Sleep for sometime to get old procedure WALs 201 Thread.sleep(ttlProcedureWAL - ttlWAL); 202 203 // Case 3: old WALs which would be deletable 204 for (int i = 1; i <= 30; i++) { 205 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 206 fs.createNewFile(fileName); 207 } 208 // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset 209 masterServices.getReplicationPeerManager().listPeers(null) 210 .add(new ReplicationPeerDescription(peerId, true, null, null)); 211 queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName, 212 new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap()); 213 // Case 5: 5 Procedure WALs that are new, will stay 214 for (int i = 6; i <= 10; i++) { 215 Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 216 fs.createNewFile(fileName); 217 } 218 219 // Sleep for sometime to get newer modification time 220 Thread.sleep(ttlWAL); 221 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 222 223 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 224 // so we are not going down the chain 225 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 226 227 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 228 LOG.info("File status: {}", Arrays.toString(status)); 229 230 // There should be 34 files and 1 masterProcedureWALs directory 231 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 232 // 10 procedure WALs 233 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 234 235 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, 236 ImmutableMap.of(HMaster.MASTER, masterServices)); 237 cleaner.chore(); 238 239 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 240 // are scheduled for replication and masterProcedureWALs directory 241 TEST_UTIL.waitFor(1000, 242 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length); 243 // In masterProcedureWALs we end up with 5 newer Procedure WALs 244 TEST_UTIL.waitFor(1000, 245 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 246 247 if (LOG.isDebugEnabled()) { 248 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 249 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 250 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 251 LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs)); 252 } 253 } 254 255 @Test 256 public void testOnConfigurationChange() throws Exception { 257 // Prepare environments 258 Server server = new DummyServer(); 259 260 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 261 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, 262 ImmutableMap.of(HMaster.MASTER, masterServices)); 263 int size = cleaner.getSizeOfCleaners(); 264 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 265 cleaner.getCleanerThreadTimeoutMsec()); 266 // Create dir and files for test 267 int numOfFiles = 10; 268 createFiles(fs, OLD_WALS_DIR, numOfFiles); 269 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 270 assertEquals(numOfFiles, status.length); 271 // Start cleaner chore 272 Thread thread = new Thread(() -> cleaner.chore()); 273 thread.setDaemon(true); 274 thread.start(); 275 // change size of cleaners dynamically 276 int sizeToChange = 4; 277 long threadTimeoutToChange = 30 * 1000L; 278 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 279 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 280 cleaner.onConfigurationChange(conf); 281 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 282 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 283 // Stop chore 284 thread.join(); 285 status = fs.listStatus(OLD_WALS_DIR); 286 assertEquals(0, status.length); 287 } 288 289 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 290 for (int i = 0; i < numOfFiles; i++) { 291 // size of each file is 1M, 2M, or 3M 292 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 293 byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)]; 294 Bytes.random(M); 295 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 296 fsdos.write(M); 297 } 298 } 299 } 300 301 private static final class DummyServer extends MockServer { 302 303 @Override 304 public Configuration getConfiguration() { 305 return TEST_UTIL.getConfiguration(); 306 } 307 308 @Override 309 public ZKWatcher getZooKeeper() { 310 try { 311 return new ZKWatcher(getConfiguration(), "dummy server", this); 312 } catch (IOException e) { 313 e.printStackTrace(); 314 } 315 return null; 316 } 317 } 318}