001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.Set; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.master.MasterServices; 039import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 040import org.apache.hadoop.hbase.master.assignment.MockMasterServices; 041import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure; 042import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 043import org.apache.hadoop.hbase.procedure2.Procedure; 044import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 045import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 046import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 047import org.apache.hadoop.hbase.testclassification.MasterTests; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.rules.TestName; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 064 065@Category({ MasterTests.class, MediumTests.class }) 066public class TestServerRemoteProcedure { 067 private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); 071 @Rule 072 public TestName name = new TestName(); 073 private HBaseTestingUtility util; 074 private MockRSProcedureDispatcher rsDispatcher; 075 private MockMasterServices master; 076 private AssignmentManager am; 077 // Simple executor to run some simple tasks. 078 private ScheduledExecutorService executor; 079 080 @Before 081 public void setUp() throws Exception { 082 util = new HBaseTestingUtility(); 083 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 084 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 085 master = new MockMasterServices(util.getConfiguration()); 086 rsDispatcher = new MockRSProcedureDispatcher(master); 087 rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); 088 master.start(2, rsDispatcher); 089 am = master.getAssignmentManager(); 090 master.getServerManager().getOnlineServersList().stream() 091 .forEach(serverName -> am.getRegionStates().createServer(serverName)); 092 } 093 094 @After 095 public void tearDown() throws Exception { 096 master.stop("tearDown"); 097 this.executor.shutdownNow(); 098 } 099 100 @Test 101 public void testSplitWALAndCrashBeforeResponse() throws Exception { 102 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 103 ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); 104 ServerRemoteProcedure splitWALRemoteProcedure = 105 new SplitWALRemoteProcedure(worker, crashedWorker, "test"); 106 Future<byte[]> future = submitProcedure(splitWALRemoteProcedure); 107 Thread.sleep(2000); 108 master.getServerManager().expireServer(worker); 109 // if remoteCallFailed is called for this procedure, this procedure should be finished. 110 future.get(5000, TimeUnit.MILLISECONDS); 111 Assert.assertTrue(splitWALRemoteProcedure.isSuccess()); 112 } 113 114 @Test 115 public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { 116 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 117 ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); 118 Future<byte[]> future = submitProcedure(noopServerRemoteProcedure); 119 Thread.sleep(2000); 120 // complete the process and fail the process at the same time 121 ExecutorService threadPool = Executors.newFixedThreadPool(2); 122 threadPool.execute(() -> noopServerRemoteProcedure 123 .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); 124 threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( 125 master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); 126 future.get(2000, TimeUnit.MILLISECONDS); 127 Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); 128 } 129 130 @Test 131 public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception { 132 TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher"); 133 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1)) 134 .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build(); 135 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 136 env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri); 137 TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null); 138 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 139 OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker); 140 Future<byte[]> future = submitProcedure(openRegionProcedure); 141 Thread.sleep(2000); 142 rsDispatcher.removeNode(worker); 143 try { 144 future.get(2000, TimeUnit.MILLISECONDS); 145 fail(); 146 } catch (TimeoutException e) { 147 LOG.info("timeout is expected"); 148 } 149 Assert.assertFalse(openRegionProcedure.isFinished()); 150 } 151 152 private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 153 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 154 } 155 156 private static class NoopServerRemoteProcedure extends ServerRemoteProcedure 157 implements ServerProcedureInterface { 158 159 public NoopServerRemoteProcedure(ServerName targetServer) { 160 this.targetServer = targetServer; 161 } 162 163 @Override 164 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 165 return; 166 } 167 168 @Override 169 protected boolean abort(MasterProcedureEnv env) { 170 return false; 171 } 172 173 @Override 174 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 175 return; 176 } 177 178 @Override 179 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 180 return; 181 } 182 183 @Override 184 public Optional<RemoteProcedureDispatcher.RemoteOperation> 185 remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { 186 return Optional 187 .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0])); 188 } 189 190 @Override 191 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 192 complete(env, null); 193 } 194 195 @Override 196 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 197 RemoteProcedureException error) { 198 complete(env, error); 199 } 200 201 @Override 202 public void complete(MasterProcedureEnv env, Throwable error) { 203 this.succ = true; 204 return; 205 } 206 207 @Override 208 public ServerName getServerName() { 209 return targetServer; 210 } 211 212 @Override 213 public boolean hasMetaTableRegion() { 214 return false; 215 } 216 217 @Override 218 public ServerOperationType getServerOperationType() { 219 return SWITCH_RPC_THROTTLE; 220 } 221 222 } 223 224 protected interface MockRSExecutor { 225 AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 226 AdminProtos.ExecuteProceduresRequest req) throws IOException; 227 } 228 229 protected static class NoopRSExecutor implements MockRSExecutor { 230 @Override 231 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 232 AdminProtos.ExecuteProceduresRequest req) throws IOException { 233 if (req.getOpenRegionCount() > 0) { 234 for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { 235 for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { 236 execOpenRegion(server, openReq); 237 } 238 } 239 } 240 return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); 241 } 242 243 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 244 AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { 245 return null; 246 } 247 } 248 249 protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { 250 private MockRSExecutor mockRsExec; 251 252 public MockRSProcedureDispatcher(final MasterServices master) { 253 super(master); 254 } 255 256 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 257 this.mockRsExec = mockRsExec; 258 } 259 260 @Override 261 protected void remoteDispatch(ServerName serverName, 262 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 263 submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); 264 } 265 266 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 267 public MockRemoteCall(final ServerName serverName, 268 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 269 super(serverName, operations); 270 } 271 272 @Override 273 protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, 274 final AdminProtos.ExecuteProceduresRequest request) throws IOException { 275 return mockRsExec.sendRequest(serverName, request); 276 } 277 } 278 } 279}