001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.emptyIterable; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.stream.Collectors; 035import java.util.stream.Stream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.master.HMaster; 043import org.apache.hadoop.hbase.master.MasterServices; 044import org.apache.hadoop.hbase.master.ServerManager; 045import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 046import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 047import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 048import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 049import org.apache.hadoop.hbase.replication.ReplicationException; 050import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 052import org.apache.hadoop.hbase.replication.ReplicationQueueData; 053import org.apache.hadoop.hbase.replication.ReplicationQueueId; 054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 066 067@Category({ MasterTests.class, SmallTests.class }) 068public class TestReplicationLogCleaner { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestReplicationLogCleaner.class); 073 074 private static final Configuration CONF = HBaseConfiguration.create(); 075 076 private MasterServices services; 077 078 private ReplicationLogCleaner cleaner; 079 080 @Before 081 public void setUp() throws ReplicationException { 082 services = mock(MasterServices.class); 083 when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); 084 ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); 085 when(services.getReplicationPeerManager()).thenReturn(rpm); 086 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 087 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 088 when(rpm.getQueueStorage()).thenReturn(rqs); 089 when(rpm.getQueueStorage().hasData()).thenReturn(true); 090 when(rqs.listAllQueues()).thenReturn(new ArrayList<>()); 091 ServerManager sm = mock(ServerManager.class); 092 when(services.getServerManager()).thenReturn(sm); 093 when(sm.getOnlineServersList()).thenReturn(new ArrayList<>()); 094 @SuppressWarnings("unchecked") 095 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 096 when(services.getMasterProcedureExecutor()).thenReturn(procExec); 097 when(procExec.getProcedures()).thenReturn(new ArrayList<>()); 098 099 cleaner = new ReplicationLogCleaner(); 100 cleaner.setConf(CONF); 101 Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services); 102 cleaner.init(params); 103 } 104 105 @After 106 public void tearDown() { 107 cleaner.postClean(); 108 } 109 110 private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner, 111 Iterable<FileStatus> files) { 112 cleaner.preClean(); 113 return cleaner.getDeletableFiles(files); 114 } 115 116 private static FileStatus createFileStatus(Path path) { 117 return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path); 118 } 119 120 private static FileStatus createFileStatus(ServerName sn, int number) { 121 Path path = new Path(sn.toString() + "." + number); 122 return createFileStatus(path); 123 } 124 125 private static ReplicationPeerDescription createPeer(String peerId) { 126 return new ReplicationPeerDescription(peerId, true, null, null); 127 } 128 129 private void addServer(ServerName serverName) { 130 services.getServerManager().getOnlineServersList().add(serverName); 131 } 132 133 private void addSCP(ServerName serverName, boolean finished) { 134 ServerCrashProcedure scp = mock(ServerCrashProcedure.class); 135 when(scp.getServerName()).thenReturn(serverName); 136 when(scp.isFinished()).thenReturn(finished); 137 services.getMasterProcedureExecutor().getProcedures().add(scp); 138 } 139 140 private void addPeer(String... peerIds) { 141 services.getReplicationPeerManager().listPeers(null).addAll( 142 Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList())); 143 } 144 145 private void addQueueData(ReplicationQueueData... datas) throws ReplicationException { 146 services.getReplicationPeerManager().getQueueStorage().listAllQueues() 147 .addAll(Arrays.asList(datas)); 148 } 149 150 @Test 151 public void testNoConf() { 152 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 153 List<FileStatus> files = Arrays.asList(new FileStatus()); 154 assertSame(files, runCleaner(cleaner, files)); 155 cleaner.postClean(); 156 } 157 158 @Test 159 public void testCanNotFilter() { 160 assertTrue(services.getReplicationLogCleanerBarrier().disable()); 161 List<FileStatus> files = Arrays.asList(new FileStatus()); 162 assertSame(Collections.emptyList(), runCleaner(cleaner, files)); 163 } 164 165 @Test 166 public void testNoPeer() { 167 Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime()); 168 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 169 FileStatus file = createFileStatus(path); 170 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 171 assertSame(file, iter.next()); 172 assertFalse(iter.hasNext()); 173 } 174 175 @Test 176 public void testNotValidWalFile() { 177 addPeer("1"); 178 Path path = new Path("/whatever"); 179 assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName())); 180 FileStatus file = createFileStatus(path); 181 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 182 assertSame(file, iter.next()); 183 assertFalse(iter.hasNext()); 184 } 185 186 @Test 187 public void testMetaWalFile() { 188 addPeer("1"); 189 Path path = new Path( 190 "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID); 191 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 192 assertTrue(AbstractFSWALProvider.isMetaFile(path)); 193 FileStatus file = createFileStatus(path); 194 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 195 assertSame(file, iter.next()); 196 assertFalse(iter.hasNext()); 197 } 198 199 @Test 200 public void testLiveRegionServerNoQueues() { 201 addPeer("1"); 202 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 203 addServer(sn); 204 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 205 assertThat(runCleaner(cleaner, files), emptyIterable()); 206 } 207 208 @Test 209 public void testLiveRegionServerWithSCPNoQueues() { 210 addPeer("1"); 211 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 212 addSCP(sn, false); 213 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 214 assertThat(runCleaner(cleaner, files), emptyIterable()); 215 } 216 217 @Test 218 public void testDeadRegionServerNoQueues() { 219 addPeer("1"); 220 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 221 FileStatus file = createFileStatus(sn, 1); 222 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 223 assertSame(file, iter.next()); 224 assertFalse(iter.hasNext()); 225 } 226 227 @Test 228 public void testDeadRegionServerWithSCPNoQueues() { 229 addPeer("1"); 230 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 231 addSCP(sn, true); 232 FileStatus file = createFileStatus(sn, 1); 233 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 234 assertSame(file, iter.next()); 235 assertFalse(iter.hasNext()); 236 } 237 238 @Test 239 public void testLiveRegionServerMissingQueue() throws ReplicationException { 240 String peerId1 = "1"; 241 String peerId2 = "2"; 242 addPeer(peerId1, peerId2); 243 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 244 addServer(sn); 245 FileStatus file = createFileStatus(sn, 1); 246 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 247 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 248 addQueueData(data1); 249 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 250 } 251 252 @Test 253 public void testLiveRegionServerShouldNotDelete() throws ReplicationException { 254 String peerId = "1"; 255 addPeer(peerId); 256 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 257 addServer(sn); 258 FileStatus file = createFileStatus(sn, 1); 259 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 260 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 261 addQueueData(data); 262 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 263 } 264 265 @Test 266 public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 267 String peerId1 = "1"; 268 String peerId2 = "2"; 269 addPeer(peerId1, peerId2); 270 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 271 addServer(sn); 272 FileStatus file = createFileStatus(sn, 1); 273 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 274 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 275 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 276 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 277 addQueueData(data1, data2); 278 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 279 } 280 281 @Test 282 public void testLiveRegionServerShouldDelete() throws ReplicationException { 283 String peerId = "1"; 284 addPeer(peerId); 285 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 286 addServer(sn); 287 FileStatus file = createFileStatus(sn, 1); 288 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 289 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 290 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 291 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 292 assertSame(file, iter.next()); 293 assertFalse(iter.hasNext()); 294 } 295 296 @Test 297 public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException { 298 String peerId1 = "1"; 299 String peerId2 = "2"; 300 addPeer(peerId1, peerId2); 301 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 302 addServer(sn); 303 FileStatus file = createFileStatus(sn, 1); 304 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 305 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 306 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 307 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 308 addQueueData(data1, data2); 309 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 310 assertSame(file, iter.next()); 311 assertFalse(iter.hasNext()); 312 } 313 314 @Test 315 public void testDeadRegionServerMissingQueue() throws ReplicationException { 316 String peerId1 = "1"; 317 String peerId2 = "2"; 318 addPeer(peerId1, peerId2); 319 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 320 FileStatus file = createFileStatus(sn, 1); 321 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 322 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 323 addQueueData(data1); 324 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 325 assertSame(file, iter.next()); 326 assertFalse(iter.hasNext()); 327 } 328 329 @Test 330 public void testDeadRegionServerShouldNotDelete() throws ReplicationException { 331 String peerId = "1"; 332 addPeer(peerId); 333 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 334 FileStatus file = createFileStatus(sn, 1); 335 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 336 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 337 addQueueData(data); 338 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 339 } 340 341 @Test 342 public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 343 String peerId1 = "1"; 344 String peerId2 = "2"; 345 addPeer(peerId1, peerId2); 346 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 347 FileStatus file = createFileStatus(sn, 1); 348 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 349 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 350 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 351 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 352 addQueueData(data1, data2); 353 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 354 } 355 356 @Test 357 public void testDeadRegionServerShouldDelete() throws ReplicationException { 358 String peerId = "1"; 359 addPeer(peerId); 360 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 361 FileStatus file = createFileStatus(sn, 1); 362 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 363 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 364 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 365 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 366 assertSame(file, iter.next()); 367 assertFalse(iter.hasNext()); 368 } 369 370 @Test 371 public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException { 372 String peerId1 = "1"; 373 String peerId2 = "2"; 374 addPeer(peerId1, peerId2); 375 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 376 FileStatus file = createFileStatus(sn, 1); 377 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 378 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 379 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 380 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 381 addQueueData(data1, data2); 382 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 383 assertSame(file, iter.next()); 384 assertFalse(iter.hasNext()); 385 } 386}