001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertSame; 022import static org.junit.Assert.fail; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.ArrayDeque; 030import java.util.ArrayList; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.Set; 036import java.util.function.Function; 037import java.util.stream.Collectors; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.ServerMetrics; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.master.MasterFileSystem; 044import org.apache.hadoop.hbase.master.MasterServices; 045import org.apache.hadoop.hbase.master.ServerListener; 046import org.apache.hadoop.hbase.master.ServerManager; 047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 048import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 049import org.apache.hadoop.hbase.procedure2.Procedure; 050import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 051import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 052import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 053import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 054import org.apache.hadoop.hbase.replication.ReplicationException; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.mockito.invocation.InvocationOnMock; 062import org.mockito.stubbing.Answer; 063 064@Category({ MasterTests.class, SmallTests.class }) 065public class TestSyncReplicationReplayWALManager { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestSyncReplicationReplayWALManager.class); 070 071 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 072 073 private SyncReplicationReplayWALManager manager; 074 075 private MasterProcedureScheduler scheduler; 076 077 private Set<ServerName> onlineServers; 078 079 private List<ServerListener> listeners; 080 081 private Queue<Procedure<?>> wokenProcedures; 082 083 @Before 084 public void setUp() throws IOException, ReplicationException { 085 wokenProcedures = new ArrayDeque<>(); 086 onlineServers = new HashSet<>(); 087 listeners = new ArrayList<>(); 088 ServerManager serverManager = mock(ServerManager.class); 089 doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager) 090 .registerListener(any(ServerListener.class)); 091 ServerMetrics serverMetrics = mock(ServerMetrics.class); 092 doAnswer(inv -> onlineServers.stream() 093 .collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager) 094 .getOnlineServers(); 095 096 MasterFileSystem mfs = mock(MasterFileSystem.class); 097 when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem()); 098 when(mfs.getWALRootDir()).thenReturn(new Path("/")); 099 100 scheduler = mock(MasterProcedureScheduler.class); 101 doAnswer(new Answer<Void>() { 102 103 @Override 104 public Void answer(InvocationOnMock invocation) throws Throwable { 105 ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0]; 106 event.wakeInternal(new MasterProcedureScheduler(pid -> null) { 107 108 @Override 109 public void addFront(Iterator<Procedure> procedureIterator) { 110 procedureIterator.forEachRemaining(wokenProcedures::add); 111 } 112 }); 113 return null; 114 } 115 }).when(scheduler).wakeEvents(any(ProcedureEvent[].class)); 116 MasterProcedureEnv env = mock(MasterProcedureEnv.class); 117 when(env.getProcedureScheduler()).thenReturn(scheduler); 118 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 119 when(procExec.getEnvironment()).thenReturn(env); 120 121 MasterServices services = mock(MasterServices.class); 122 when(services.getServerManager()).thenReturn(serverManager); 123 when(services.getMasterFileSystem()).thenReturn(mfs); 124 when(services.getMasterProcedureExecutor()).thenReturn(procExec); 125 manager = new SyncReplicationReplayWALManager(services); 126 assertEquals(1, listeners.size()); 127 } 128 129 @Test 130 public void testUsedWorkers() throws ProcedureSuspendedException { 131 String peerId1 = "1"; 132 String peerId2 = "2"; 133 ServerName sn1 = ServerName.valueOf("host1", 123, 12345); 134 ServerName sn2 = ServerName.valueOf("host2", 234, 23456); 135 ServerName sn3 = ServerName.valueOf("host3", 345, 34567); 136 onlineServers.add(sn1); 137 manager.registerPeer(peerId1); 138 manager.registerPeer(peerId2); 139 // confirm that different peer ids does not affect each other 140 assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 141 assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); 142 onlineServers.add(sn2); 143 assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 144 assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); 145 146 NoopProcedure<?> proc = new NoopProcedure<>(); 147 try { 148 manager.acquirePeerWorker(peerId1, proc); 149 fail("Should suspend"); 150 } catch (ProcedureSuspendedException e) { 151 // expected 152 } 153 manager.releasePeerWorker(peerId1, sn1, scheduler); 154 assertEquals(1, wokenProcedures.size()); 155 assertSame(proc, wokenProcedures.poll()); 156 157 assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 158 159 NoopProcedure<?> proc1 = new NoopProcedure<>(); 160 NoopProcedure<?> proc2 = new NoopProcedure<>(); 161 try { 162 manager.acquirePeerWorker(peerId1, proc1); 163 fail("Should suspend"); 164 } catch (ProcedureSuspendedException e) { 165 // expected 166 } 167 try { 168 manager.acquirePeerWorker(peerId1, proc2); 169 fail("Should suspend"); 170 } catch (ProcedureSuspendedException e) { 171 // expected 172 } 173 174 listeners.get(0).serverAdded(sn3); 175 assertEquals(2, wokenProcedures.size()); 176 assertSame(proc2, wokenProcedures.poll()); 177 assertSame(proc1, wokenProcedures.poll()); 178 } 179}