001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.CountDownLatch;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.StartTestingClusterOption;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.master.HMaster;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.RegionServerList;
044import org.apache.hadoop.hbase.master.ServerManager;
045import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
046import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
047import org.apache.hadoop.hbase.procedure2.Procedure;
048import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
049import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
050import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
051import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
054import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
066
067@Category({ MasterTests.class, MediumTests.class })
068public class TestMigrateReplicationQueueFromZkToTableProcedure {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class);
073
074  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  public static final class HMasterForTest extends HMaster {
077
078    public HMasterForTest(Configuration conf) throws IOException {
079      super(conf);
080    }
081
082    @Override
083    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
084      throws IOException {
085      setupClusterConnection();
086      return new ServerManagerForTest(master, storage);
087    }
088  }
089
090  private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS =
091    new ConcurrentHashMap<>();
092
093  public static final class ServerManagerForTest extends ServerManager {
094
095    public ServerManagerForTest(MasterServices master, RegionServerList storage) {
096      super(master, storage);
097    }
098
099    @Override
100    public Map<ServerName, ServerMetrics> getOnlineServers() {
101      Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers());
102      map.putAll(EXTRA_REGION_SERVERS);
103      return map;
104    }
105  }
106
107  @BeforeClass
108  public static void setupCluster() throws Exception {
109    // one hour, to make sure it will not run during the test
110    UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
111    UTIL.startMiniCluster(
112      StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
113  }
114
115  @AfterClass
116  public static void cleanupTest() throws Exception {
117    UTIL.shutdownMiniCluster();
118  }
119
120  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
121    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
122  }
123
124  @After
125  public void tearDown() throws Exception {
126    Admin admin = UTIL.getAdmin();
127    for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
128      admin.removeReplicationPeer(pd.getPeerId());
129    }
130  }
131
132  private static CountDownLatch PEER_PROC_ARRIVE;
133
134  private static CountDownLatch PEER_PROC_RESUME;
135
136  public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv>
137    implements PeerProcedureInterface {
138
139    private String peerId;
140
141    public FakePeerProcedure() {
142    }
143
144    public FakePeerProcedure(String peerId) {
145      this.peerId = peerId;
146    }
147
148    @Override
149    public String getPeerId() {
150      return peerId;
151    }
152
153    @Override
154    public PeerOperationType getPeerOperationType() {
155      return PeerOperationType.UPDATE_CONFIG;
156    }
157
158    @Override
159    protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
160      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
161      PEER_PROC_ARRIVE.countDown();
162      PEER_PROC_RESUME.await();
163      return null;
164    }
165
166    @Override
167    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
168      throw new UnsupportedOperationException();
169    }
170
171    @Override
172    protected boolean abort(MasterProcedureEnv env) {
173      return false;
174    }
175
176    @Override
177    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
178    }
179
180    @Override
181    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
182    }
183  }
184
185  @Test
186  public void testWaitUntilNoPeerProcedure() throws Exception {
187    PEER_PROC_ARRIVE = new CountDownLatch(1);
188    PEER_PROC_RESUME = new CountDownLatch(1);
189    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
190    procExec.submitProcedure(new FakePeerProcedure("1"));
191    PEER_PROC_ARRIVE.await();
192    MigrateReplicationQueueFromZkToTableProcedure proc =
193      new MigrateReplicationQueueFromZkToTableProcedure();
194    procExec.submitProcedure(proc);
195    // make sure we will wait until there is no peer related procedures before proceeding
196    UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
197    // continue and make sure we can finish successfully
198    PEER_PROC_RESUME.countDown();
199    UTIL.waitFor(30000, () -> proc.isSuccess());
200  }
201
202  // make sure we will disable replication peers while migrating
203  // and also tests disable/enable replication log cleaner and wait for region server upgrading
204  @Test
205  public void testDisablePeerAndWaitStates() throws Exception {
206    String peerId = "2";
207    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
208      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
209      .setReplicateAllUserTables(true).build();
210    UTIL.getAdmin().addReplicationPeer(peerId, rpc);
211    // put a fake region server to simulate that there are still region servers with older version
212    ServerMetrics metrics = mock(ServerMetrics.class);
213    when(metrics.getVersion()).thenReturn("2.5.0");
214    EXTRA_REGION_SERVERS
215      .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
216
217    ReplicationLogCleanerBarrier barrier =
218      UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier();
219    assertTrue(barrier.start());
220
221    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
222
223    MigrateReplicationQueueFromZkToTableProcedure proc =
224      new MigrateReplicationQueueFromZkToTableProcedure();
225    procExec.submitProcedure(proc);
226
227    Thread.sleep(5000);
228    // make sure we are still waiting for replication log cleaner quit
229    assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
230      proc.getCurrentStateId());
231    barrier.stop();
232
233    // wait until we reach the wait upgrading state
234    UTIL.waitFor(30000,
235      () -> proc.getCurrentStateId()
236          == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
237        && proc.getState() == ProcedureState.WAITING_TIMEOUT);
238    // make sure the peer is disabled for migrating
239    assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
240    // make sure the replication log cleaner is disabled
241    assertFalse(barrier.start());
242
243    // the procedure should finish successfully
244    EXTRA_REGION_SERVERS.clear();
245    UTIL.waitFor(30000, () -> proc.isSuccess());
246
247    // make sure the peer is enabled again
248    assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
249    // make sure the replication log cleaner is enabled again
250    assertTrue(barrier.start());
251    barrier.stop();
252  }
253}