001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.CompletableFuture; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.CoordinatedStateManager; 034import org.apache.hadoop.hbase.ServerMetricsBuilder; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableDescriptors; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableState; 044import org.apache.hadoop.hbase.master.DummyRegionServerList; 045import org.apache.hadoop.hbase.master.LoadBalancer; 046import org.apache.hadoop.hbase.master.MasterFileSystem; 047import org.apache.hadoop.hbase.master.MasterServices; 048import org.apache.hadoop.hbase.master.MasterWalManager; 049import org.apache.hadoop.hbase.master.MockNoopMasterServices; 050import org.apache.hadoop.hbase.master.ServerManager; 051import org.apache.hadoop.hbase.master.SplitWALManager; 052import org.apache.hadoop.hbase.master.TableStateManager; 053import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.master.region.MasterRegion; 058import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 059import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 060import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 061import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 063import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 064import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 065import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 066import org.apache.hadoop.hbase.replication.ReplicationException; 067import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 068import org.apache.hadoop.hbase.security.Superusers; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.zookeeper.KeeperException; 072import org.mockito.invocation.InvocationOnMock; 073import org.mockito.stubbing.Answer; 074 075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 085 086/** 087 * A mocked master services. Tries to fake it. May not always work. 088 */ 089public class MockMasterServices extends MockNoopMasterServices { 090 private final MasterFileSystem fileSystemManager; 091 private final MasterWalManager walManager; 092 private final SplitWALManager splitWALManager; 093 private final AssignmentManager assignmentManager; 094 private final TableStateManager tableStateManager; 095 private final MasterRegion masterRegion; 096 097 private MasterProcedureEnv procedureEnv; 098 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 099 private ProcedureStore procedureStore; 100 private final Connection connection; 101 private final LoadBalancer balancer; 102 private final ServerManager serverManager; 103 private final ReplicationPeerManager rpm; 104 105 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 106 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 107 public static final ServerName MOCK_MASTER_SERVERNAME = 108 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 109 110 public MockMasterServices(Configuration conf) throws IOException, ReplicationException { 111 super(conf); 112 Superusers.initialize(conf); 113 this.fileSystemManager = new MasterFileSystem(conf); 114 this.walManager = new MasterWalManager(this); 115 this.splitWALManager = 116 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 117 ? null 118 : new SplitWALManager(this); 119 this.masterRegion = MasterRegionFactory.create(this); 120 // Mock an AM. 121 this.assignmentManager = 122 new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion)); 123 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 124 this.serverManager = new ServerManager(this, new DummyRegionServerList()); 125 this.tableStateManager = mock(TableStateManager.class); 126 when(this.tableStateManager.getTableState(any())).thenReturn(new TableState( 127 TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED)); 128 129 // Mock up a Client Interface 130 ClientProtos.ClientService.BlockingInterface ri = 131 mock(ClientProtos.ClientService.BlockingInterface.class); 132 MutateResponse.Builder builder = MutateResponse.newBuilder(); 133 builder.setProcessed(true); 134 try { 135 when(ri.mutate(any(), any())).thenReturn(builder.build()); 136 } catch (ServiceException se) { 137 throw ProtobufUtil.handleRemoteException(se); 138 } 139 try { 140 when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 141 @Override 142 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 143 return buildMultiResponse(invocation.getArgument(1)); 144 } 145 }); 146 } catch (ServiceException se) { 147 throw ProtobufUtil.getRemoteException(se); 148 } 149 this.connection = HConnectionTestingUtility.getMockedConnection(getConfiguration()); 150 // Set hbase.rootdir into test dir. 151 Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); 152 CommonFSUtils.setRootDir(getConfiguration(), rootdir); 153 this.rpm = mock(ReplicationPeerManager.class); 154 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 155 when(rqs.listAllQueueIds(any(ServerName.class))).thenReturn(Collections.emptyList()); 156 when(rpm.getQueueStorage()).thenReturn(rqs); 157 } 158 159 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 160 throws IOException, KeeperException { 161 startProcedureExecutor(remoteDispatcher); 162 this.assignmentManager.start(); 163 for (int i = 0; i < numServes; ++i) { 164 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 165 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 166 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 167 } 168 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 169 } 170 171 /** 172 * Call this restart method only after running MockMasterServices#start() The RSs can be 173 * differentiated by the port number, see ServerName in MockMasterServices#start() method above. 174 * Restart of region server will have new startcode in server name 175 * @param serverName Server name to be restarted 176 */ 177 public void restartRegionServer(ServerName serverName) throws IOException { 178 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 179 long startCode = -1; 180 for (ServerName s : onlineServers) { 181 if (s.getAddress().equals(serverName.getAddress())) { 182 startCode = s.getStartcode() + 1; 183 break; 184 } 185 } 186 if (startCode == -1) { 187 return; 188 } 189 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 190 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 191 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 192 } 193 194 @Override 195 public void stop(String why) { 196 stopProcedureExecutor(); 197 this.assignmentManager.stop(); 198 } 199 200 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 201 throws IOException { 202 final Configuration conf = getConfiguration(); 203 this.procedureStore = new NoopProcedureStore(); 204 this.procedureStore.registerListener(new ProcedureStoreListener() { 205 206 @Override 207 public void abortProcess() { 208 abort("The Procedure Store lost the lease", null); 209 } 210 }); 211 212 this.procedureEnv = new MasterProcedureEnv(this, 213 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 214 215 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 216 procedureEnv.getProcedureScheduler()); 217 218 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 219 Math.max(Runtime.getRuntime().availableProcessors(), 220 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 221 final boolean abortOnCorruption = 222 conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 223 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 224 this.procedureStore.start(numThreads); 225 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 226 this.procedureEnv.getRemoteDispatcher().start(); 227 } 228 229 private void stopProcedureExecutor() { 230 if (this.procedureEnv != null) { 231 this.procedureEnv.getRemoteDispatcher().stop(); 232 } 233 234 if (this.procedureExecutor != null) { 235 this.procedureExecutor.stop(); 236 } 237 238 if (this.procedureStore != null) { 239 this.procedureStore.stop(isAborted()); 240 } 241 } 242 243 @Override 244 public boolean isInitialized() { 245 return true; 246 } 247 248 @Override 249 public ProcedureEvent<?> getInitializedEvent() { 250 return this.initialized; 251 } 252 253 @Override 254 public MasterFileSystem getMasterFileSystem() { 255 return fileSystemManager; 256 } 257 258 @Override 259 public MasterWalManager getMasterWalManager() { 260 return walManager; 261 } 262 263 @Override 264 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 265 return procedureExecutor; 266 } 267 268 @Override 269 public LoadBalancer getLoadBalancer() { 270 return balancer; 271 } 272 273 @Override 274 public ServerManager getServerManager() { 275 return serverManager; 276 } 277 278 @Override 279 public AssignmentManager getAssignmentManager() { 280 return assignmentManager; 281 } 282 283 @Override 284 public TableStateManager getTableStateManager() { 285 return tableStateManager; 286 } 287 288 @Override 289 public Connection getConnection() { 290 return this.connection; 291 } 292 293 @Override 294 public ServerName getServerName() { 295 return MOCK_MASTER_SERVERNAME; 296 } 297 298 @Override 299 public CoordinatedStateManager getCoordinatedStateManager() { 300 return super.getCoordinatedStateManager(); 301 } 302 303 private static class MockRegionStateStore extends RegionStateStore { 304 public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) { 305 super(master, masterRegion); 306 } 307 308 @Override 309 public CompletableFuture<Void> updateRegionLocation(RegionStateNode regionNode) { 310 return CompletableFuture.completedFuture(null); 311 } 312 } 313 314 @Override 315 public TableDescriptors getTableDescriptors() { 316 return new TableDescriptors() { 317 @Override 318 public TableDescriptor remove(TableName tablename) throws IOException { 319 // noop 320 return null; 321 } 322 323 @Override 324 public Map<String, TableDescriptor> getAll() throws IOException { 325 // noop 326 return null; 327 } 328 329 @Override 330 public TableDescriptor get(TableName tablename) throws IOException { 331 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 332 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 333 return builder.build(); 334 } 335 336 @Override 337 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 338 return null; 339 } 340 341 @Override 342 public void update(TableDescriptor htd, boolean cacheOnly) throws IOException { 343 // noop 344 } 345 }; 346 } 347 348 private static MultiResponse buildMultiResponse(MultiRequest req) { 349 MultiResponse.Builder builder = MultiResponse.newBuilder(); 350 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 351 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 352 for (RegionAction regionAction : req.getRegionActionList()) { 353 regionActionResultBuilder.clear(); 354 for (ClientProtos.Action action : regionAction.getActionList()) { 355 roeBuilder.clear(); 356 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 357 roeBuilder.setIndex(action.getIndex()); 358 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 359 } 360 builder.addRegionActionResult(regionActionResultBuilder.build()); 361 } 362 return builder.build(); 363 } 364 365 @Override 366 public SplitWALManager getSplitWALManager() { 367 return splitWALManager; 368 } 369 370 @Override 371 public ReplicationPeerManager getReplicationPeerManager() { 372 return rpm; 373 } 374}