001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.CoordinatedStateManager; 033import org.apache.hadoop.hbase.ServerMetricsBuilder; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableDescriptors; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableState; 044import org.apache.hadoop.hbase.master.DummyRegionServerList; 045import org.apache.hadoop.hbase.master.LoadBalancer; 046import org.apache.hadoop.hbase.master.MasterFileSystem; 047import org.apache.hadoop.hbase.master.MasterServices; 048import org.apache.hadoop.hbase.master.MasterWalManager; 049import org.apache.hadoop.hbase.master.MockNoopMasterServices; 050import org.apache.hadoop.hbase.master.ServerManager; 051import org.apache.hadoop.hbase.master.SplitWALManager; 052import org.apache.hadoop.hbase.master.TableStateManager; 053import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.master.region.MasterRegion; 058import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 059import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 060import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 061import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 063import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 064import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 065import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 066import org.apache.hadoop.hbase.replication.ReplicationException; 067import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 068import org.apache.hadoop.hbase.security.Superusers; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.zookeeper.KeeperException; 072import org.mockito.invocation.InvocationOnMock; 073import org.mockito.stubbing.Answer; 074 075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 086 087/** 088 * A mocked master services. Tries to fake it. May not always work. 089 */ 090public class MockMasterServices extends MockNoopMasterServices { 091 private final MasterFileSystem fileSystemManager; 092 private final MasterWalManager walManager; 093 private final SplitWALManager splitWALManager; 094 private final AssignmentManager assignmentManager; 095 private final TableStateManager tableStateManager; 096 private final MasterRegion masterRegion; 097 098 private MasterProcedureEnv procedureEnv; 099 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 100 private ProcedureStore procedureStore; 101 private final ClusterConnection connection; 102 private final LoadBalancer balancer; 103 private final ServerManager serverManager; 104 private final ReplicationPeerManager rpm; 105 106 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 107 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 108 public static final ServerName MOCK_MASTER_SERVERNAME = 109 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 110 111 public MockMasterServices(Configuration conf) throws IOException, ReplicationException { 112 super(conf); 113 Superusers.initialize(conf); 114 this.fileSystemManager = new MasterFileSystem(conf); 115 this.walManager = new MasterWalManager(this); 116 this.splitWALManager = 117 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 118 ? null 119 : new SplitWALManager(this); 120 this.masterRegion = MasterRegionFactory.create(this); 121 // Mock an AM. 122 this.assignmentManager = 123 new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion)); 124 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 125 this.serverManager = new ServerManager(this, new DummyRegionServerList()); 126 this.tableStateManager = mock(TableStateManager.class); 127 when(this.tableStateManager.getTableState(any())).thenReturn(new TableState( 128 TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED)); 129 130 // Mock up a Client Interface 131 ClientProtos.ClientService.BlockingInterface ri = 132 mock(ClientProtos.ClientService.BlockingInterface.class); 133 MutateResponse.Builder builder = MutateResponse.newBuilder(); 134 builder.setProcessed(true); 135 try { 136 when(ri.mutate(any(), any())).thenReturn(builder.build()); 137 } catch (ServiceException se) { 138 throw ProtobufUtil.handleRemoteException(se); 139 } 140 try { 141 when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 142 @Override 143 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 144 return buildMultiResponse(invocation.getArgument(1)); 145 } 146 }); 147 } catch (ServiceException se) { 148 throw ProtobufUtil.getRemoteException(se); 149 } 150 // Mock n ClusterConnection and an AdminProtocol implementation. Have the 151 // ClusterConnection return the HRI. Have the HRI return a few mocked up responses 152 // to make our test work. 153 this.connection = HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), 154 mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME, 155 RegionInfoBuilder.FIRST_META_REGIONINFO); 156 // Set hbase.rootdir into test dir. 157 Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); 158 CommonFSUtils.setRootDir(getConfiguration(), rootdir); 159 this.rpm = mock(ReplicationPeerManager.class); 160 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 161 when(rqs.getAllQueues(any())).thenReturn(Collections.emptyList()); 162 when(rpm.getQueueStorage()).thenReturn(rqs); 163 } 164 165 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 166 throws IOException, KeeperException { 167 startProcedureExecutor(remoteDispatcher); 168 this.assignmentManager.start(); 169 for (int i = 0; i < numServes; ++i) { 170 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 171 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 172 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 173 } 174 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 175 } 176 177 /** 178 * Call this restart method only after running MockMasterServices#start() The RSs can be 179 * differentiated by the port number, see ServerName in MockMasterServices#start() method above. 180 * Restart of region server will have new startcode in server name 181 * @param serverName Server name to be restarted 182 */ 183 public void restartRegionServer(ServerName serverName) throws IOException { 184 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 185 long startCode = -1; 186 for (ServerName s : onlineServers) { 187 if (s.getAddress().equals(serverName.getAddress())) { 188 startCode = s.getStartcode() + 1; 189 break; 190 } 191 } 192 if (startCode == -1) { 193 return; 194 } 195 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 196 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 197 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 198 } 199 200 @Override 201 public void stop(String why) { 202 stopProcedureExecutor(); 203 this.assignmentManager.stop(); 204 } 205 206 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 207 throws IOException { 208 final Configuration conf = getConfiguration(); 209 this.procedureStore = new NoopProcedureStore(); 210 this.procedureStore.registerListener(new ProcedureStoreListener() { 211 212 @Override 213 public void abortProcess() { 214 abort("The Procedure Store lost the lease", null); 215 } 216 }); 217 218 this.procedureEnv = new MasterProcedureEnv(this, 219 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 220 221 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 222 procedureEnv.getProcedureScheduler()); 223 224 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 225 Math.max(Runtime.getRuntime().availableProcessors(), 226 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 227 final boolean abortOnCorruption = 228 conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 229 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 230 this.procedureStore.start(numThreads); 231 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 232 this.procedureEnv.getRemoteDispatcher().start(); 233 } 234 235 private void stopProcedureExecutor() { 236 if (this.procedureEnv != null) { 237 this.procedureEnv.getRemoteDispatcher().stop(); 238 } 239 240 if (this.procedureExecutor != null) { 241 this.procedureExecutor.stop(); 242 } 243 244 if (this.procedureStore != null) { 245 this.procedureStore.stop(isAborted()); 246 } 247 } 248 249 @Override 250 public boolean isInitialized() { 251 return true; 252 } 253 254 @Override 255 public ProcedureEvent<?> getInitializedEvent() { 256 return this.initialized; 257 } 258 259 @Override 260 public MasterFileSystem getMasterFileSystem() { 261 return fileSystemManager; 262 } 263 264 @Override 265 public MasterWalManager getMasterWalManager() { 266 return walManager; 267 } 268 269 @Override 270 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 271 return procedureExecutor; 272 } 273 274 @Override 275 public LoadBalancer getLoadBalancer() { 276 return balancer; 277 } 278 279 @Override 280 public ServerManager getServerManager() { 281 return serverManager; 282 } 283 284 @Override 285 public AssignmentManager getAssignmentManager() { 286 return assignmentManager; 287 } 288 289 @Override 290 public TableStateManager getTableStateManager() { 291 return tableStateManager; 292 } 293 294 @Override 295 public ClusterConnection getConnection() { 296 return this.connection; 297 } 298 299 @Override 300 public ServerName getServerName() { 301 return MOCK_MASTER_SERVERNAME; 302 } 303 304 @Override 305 public CoordinatedStateManager getCoordinatedStateManager() { 306 return super.getCoordinatedStateManager(); 307 } 308 309 private static class MockRegionStateStore extends RegionStateStore { 310 public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) { 311 super(master, masterRegion); 312 } 313 314 @Override 315 public void updateRegionLocation(RegionStateNode regionNode) throws IOException { 316 } 317 } 318 319 @Override 320 public TableDescriptors getTableDescriptors() { 321 return new TableDescriptors() { 322 @Override 323 public TableDescriptor remove(TableName tablename) throws IOException { 324 // noop 325 return null; 326 } 327 328 @Override 329 public Map<String, TableDescriptor> getAll() throws IOException { 330 // noop 331 return null; 332 } 333 334 @Override 335 public TableDescriptor get(TableName tablename) throws IOException { 336 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 337 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 338 return builder.build(); 339 } 340 341 @Override 342 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 343 return null; 344 } 345 346 @Override 347 public void update(TableDescriptor htd, boolean cacheOnly) throws IOException { 348 // noop 349 } 350 }; 351 } 352 353 private static MultiResponse buildMultiResponse(MultiRequest req) { 354 MultiResponse.Builder builder = MultiResponse.newBuilder(); 355 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 356 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 357 for (RegionAction regionAction : req.getRegionActionList()) { 358 regionActionResultBuilder.clear(); 359 for (ClientProtos.Action action : regionAction.getActionList()) { 360 roeBuilder.clear(); 361 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 362 roeBuilder.setIndex(action.getIndex()); 363 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 364 } 365 builder.addRegionActionResult(regionActionResultBuilder.build()); 366 } 367 return builder.build(); 368 } 369 370 @Override 371 public SplitWALManager getSplitWALManager() { 372 return splitWALManager; 373 } 374 375 @Override 376 public ReplicationPeerManager getReplicationPeerManager() { 377 return rpm; 378 } 379}