001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.Set;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.master.MasterServices;
039import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
040import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
041import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
042import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
045import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
046import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.rules.TestName;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
064
065@Category({ MasterTests.class, MediumTests.class })
066public class TestServerRemoteProcedure {
067  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
071  @Rule
072  public TestName name = new TestName();
073  private HBaseTestingUtility util;
074  private MockRSProcedureDispatcher rsDispatcher;
075  private MockMasterServices master;
076  private AssignmentManager am;
077  // Simple executor to run some simple tasks.
078  private ScheduledExecutorService executor;
079
080  @Before
081  public void setUp() throws Exception {
082    util = new HBaseTestingUtility();
083    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
084      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
085    master = new MockMasterServices(util.getConfiguration());
086    rsDispatcher = new MockRSProcedureDispatcher(master);
087    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
088    master.start(2, rsDispatcher);
089    am = master.getAssignmentManager();
090    master.getServerManager().getOnlineServersList().stream()
091      .forEach(serverName -> am.getRegionStates().createServer(serverName));
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    master.stop("tearDown");
097    this.executor.shutdownNow();
098  }
099
100  @Test
101  public void testSplitWALAndCrashBeforeResponse() throws Exception {
102    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
103    ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
104    ServerRemoteProcedure splitWALRemoteProcedure =
105      new SplitWALRemoteProcedure(worker, crashedWorker, "test");
106    Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
107    Thread.sleep(2000);
108    master.getServerManager().expireServer(worker);
109    // if remoteCallFailed is called for this procedure, this procedure should be finished.
110    future.get(5000, TimeUnit.MILLISECONDS);
111    Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
112  }
113
114  @Test
115  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
116    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
117    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
118    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
119    Thread.sleep(2000);
120    // complete the process and fail the process at the same time
121    ExecutorService threadPool = Executors.newFixedThreadPool(2);
122    threadPool.execute(() -> noopServerRemoteProcedure
123      .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
124    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
125      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
126    future.get(2000, TimeUnit.MILLISECONDS);
127    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
128  }
129
130  @Test
131  public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
132    TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
133    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
134      .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
135    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
136    env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
137    TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
138    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
139    OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
140    Future<byte[]> future = submitProcedure(openRegionProcedure);
141    Thread.sleep(2000);
142    rsDispatcher.removeNode(worker);
143    try {
144      future.get(2000, TimeUnit.MILLISECONDS);
145      fail();
146    } catch (TimeoutException e) {
147      LOG.info("timeout is expected");
148    }
149    Assert.assertFalse(openRegionProcedure.isFinished());
150  }
151
152  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
153    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
154  }
155
156  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
157    implements ServerProcedureInterface {
158
159    public NoopServerRemoteProcedure(ServerName targetServer) {
160      this.targetServer = targetServer;
161    }
162
163    @Override
164    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
165      return;
166    }
167
168    @Override
169    protected boolean abort(MasterProcedureEnv env) {
170      return false;
171    }
172
173    @Override
174    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
175      return;
176    }
177
178    @Override
179    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
180      return;
181    }
182
183    @Override
184    public Optional<RemoteProcedureDispatcher.RemoteOperation>
185      remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
186      return Optional
187        .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]));
188    }
189
190    @Override
191    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
192      complete(env, null);
193    }
194
195    @Override
196    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
197      RemoteProcedureException error) {
198      complete(env, error);
199    }
200
201    @Override
202    public void complete(MasterProcedureEnv env, Throwable error) {
203      this.succ = true;
204      return;
205    }
206
207    @Override
208    public ServerName getServerName() {
209      return targetServer;
210    }
211
212    @Override
213    public boolean hasMetaTableRegion() {
214      return false;
215    }
216
217    @Override
218    public ServerOperationType getServerOperationType() {
219      return SWITCH_RPC_THROTTLE;
220    }
221
222  }
223
224  protected interface MockRSExecutor {
225    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
226      AdminProtos.ExecuteProceduresRequest req) throws IOException;
227  }
228
229  protected static class NoopRSExecutor implements MockRSExecutor {
230    @Override
231    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
232      AdminProtos.ExecuteProceduresRequest req) throws IOException {
233      if (req.getOpenRegionCount() > 0) {
234        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
235          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
236            execOpenRegion(server, openReq);
237          }
238        }
239      }
240      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
241    }
242
243    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
244      AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
245      return null;
246    }
247  }
248
249  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
250    private MockRSExecutor mockRsExec;
251
252    public MockRSProcedureDispatcher(final MasterServices master) {
253      super(master);
254    }
255
256    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
257      this.mockRsExec = mockRsExec;
258    }
259
260    @Override
261    protected void remoteDispatch(ServerName serverName,
262      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
263      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
264    }
265
266    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
267      public MockRemoteCall(final ServerName serverName,
268        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
269        super(serverName, operations);
270      }
271
272      @Override
273      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
274        final AdminProtos.ExecuteProceduresRequest request) throws IOException {
275        return mockRsExec.sendRequest(serverName, request);
276      }
277    }
278  }
279}