001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertSame;
022import static org.junit.Assert.fail;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayDeque;
030import java.util.ArrayList;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.Set;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.ServerMetrics;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.master.MasterFileSystem;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.ServerListener;
046import org.apache.hadoop.hbase.master.ServerManager;
047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
048import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
049import org.apache.hadoop.hbase.procedure2.Procedure;
050import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
051import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
052import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
053import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.mockito.invocation.InvocationOnMock;
062import org.mockito.stubbing.Answer;
063
064@Category({ MasterTests.class, SmallTests.class })
065public class TestSyncReplicationReplayWALManager {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestSyncReplicationReplayWALManager.class);
070
071  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
072
073  private SyncReplicationReplayWALManager manager;
074
075  private MasterProcedureScheduler scheduler;
076
077  private Set<ServerName> onlineServers;
078
079  private List<ServerListener> listeners;
080
081  private Queue<Procedure<?>> wokenProcedures;
082
083  @Before
084  public void setUp() throws IOException, ReplicationException {
085    wokenProcedures = new ArrayDeque<>();
086    onlineServers = new HashSet<>();
087    listeners = new ArrayList<>();
088    ServerManager serverManager = mock(ServerManager.class);
089    doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager)
090      .registerListener(any(ServerListener.class));
091    ServerMetrics serverMetrics = mock(ServerMetrics.class);
092    doAnswer(inv -> onlineServers.stream()
093      .collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager)
094        .getOnlineServers();
095
096    MasterFileSystem mfs = mock(MasterFileSystem.class);
097    when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem());
098    when(mfs.getWALRootDir()).thenReturn(new Path("/"));
099
100    scheduler = mock(MasterProcedureScheduler.class);
101    doAnswer(new Answer<Void>() {
102
103      @Override
104      public Void answer(InvocationOnMock invocation) throws Throwable {
105        ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0];
106        event.wakeInternal(new MasterProcedureScheduler(pid -> null) {
107
108          @Override
109          public void addFront(Iterator<Procedure> procedureIterator) {
110            procedureIterator.forEachRemaining(wokenProcedures::add);
111          }
112        });
113        return null;
114      }
115    }).when(scheduler).wakeEvents(any(ProcedureEvent[].class));
116    MasterProcedureEnv env = mock(MasterProcedureEnv.class);
117    when(env.getProcedureScheduler()).thenReturn(scheduler);
118    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
119    when(procExec.getEnvironment()).thenReturn(env);
120
121    MasterServices services = mock(MasterServices.class);
122    when(services.getServerManager()).thenReturn(serverManager);
123    when(services.getMasterFileSystem()).thenReturn(mfs);
124    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
125    manager = new SyncReplicationReplayWALManager(services);
126    assertEquals(1, listeners.size());
127  }
128
129  @Test
130  public void testUsedWorkers() throws ProcedureSuspendedException {
131    String peerId1 = "1";
132    String peerId2 = "2";
133    ServerName sn1 = ServerName.valueOf("host1", 123, 12345);
134    ServerName sn2 = ServerName.valueOf("host2", 234, 23456);
135    ServerName sn3 = ServerName.valueOf("host3", 345, 34567);
136    onlineServers.add(sn1);
137    manager.registerPeer(peerId1);
138    manager.registerPeer(peerId2);
139    // confirm that different peer ids does not affect each other
140    assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
141    assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
142    onlineServers.add(sn2);
143    assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
144    assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
145
146    NoopProcedure<?> proc = new NoopProcedure<>();
147    try {
148      manager.acquirePeerWorker(peerId1, proc);
149      fail("Should suspend");
150    } catch (ProcedureSuspendedException e) {
151      // expected
152    }
153    manager.releasePeerWorker(peerId1, sn1, scheduler);
154    assertEquals(1, wokenProcedures.size());
155    assertSame(proc, wokenProcedures.poll());
156
157    assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
158
159    NoopProcedure<?> proc1 = new NoopProcedure<>();
160    NoopProcedure<?> proc2 = new NoopProcedure<>();
161    try {
162      manager.acquirePeerWorker(peerId1, proc1);
163      fail("Should suspend");
164    } catch (ProcedureSuspendedException e) {
165      // expected
166    }
167    try {
168      manager.acquirePeerWorker(peerId1, proc2);
169      fail("Should suspend");
170    } catch (ProcedureSuspendedException e) {
171      // expected
172    }
173
174    listeners.get(0).serverAdded(sn3);
175    assertEquals(2, wokenProcedures.size());
176    assertSame(proc2, wokenProcedures.poll());
177    assertSame(proc1, wokenProcedures.poll());
178  }
179}