001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.CoordinatedStateManager;
033import org.apache.hadoop.hbase.ServerMetricsBuilder;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableDescriptors;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ClusterConnection;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.client.TableState;
044import org.apache.hadoop.hbase.master.DummyRegionServerList;
045import org.apache.hadoop.hbase.master.LoadBalancer;
046import org.apache.hadoop.hbase.master.MasterFileSystem;
047import org.apache.hadoop.hbase.master.MasterServices;
048import org.apache.hadoop.hbase.master.MasterWalManager;
049import org.apache.hadoop.hbase.master.MockNoopMasterServices;
050import org.apache.hadoop.hbase.master.ServerManager;
051import org.apache.hadoop.hbase.master.SplitWALManager;
052import org.apache.hadoop.hbase.master.TableStateManager;
053import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
054import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
057import org.apache.hadoop.hbase.master.region.MasterRegion;
058import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
059import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
060import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
061import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
063import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
064import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
065import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
066import org.apache.hadoop.hbase.replication.ReplicationException;
067import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
068import org.apache.hadoop.hbase.security.Superusers;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.zookeeper.KeeperException;
072import org.mockito.invocation.InvocationOnMock;
073import org.mockito.stubbing.Answer;
074
075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
086
087/**
088 * A mocked master services. Tries to fake it. May not always work.
089 */
090public class MockMasterServices extends MockNoopMasterServices {
091  private final MasterFileSystem fileSystemManager;
092  private final MasterWalManager walManager;
093  private final SplitWALManager splitWALManager;
094  private final AssignmentManager assignmentManager;
095  private final TableStateManager tableStateManager;
096  private final MasterRegion masterRegion;
097
098  private MasterProcedureEnv procedureEnv;
099  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
100  private ProcedureStore procedureStore;
101  private final ClusterConnection connection;
102  private final LoadBalancer balancer;
103  private final ServerManager serverManager;
104  private final ReplicationPeerManager rpm;
105
106  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
107  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
108  public static final ServerName MOCK_MASTER_SERVERNAME =
109    ServerName.valueOf("mockmaster.example.org", 1234, -1L);
110
111  public MockMasterServices(Configuration conf) throws IOException, ReplicationException {
112    super(conf);
113    Superusers.initialize(conf);
114    this.fileSystemManager = new MasterFileSystem(conf);
115    this.walManager = new MasterWalManager(this);
116    this.splitWALManager =
117      conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
118        ? null
119        : new SplitWALManager(this);
120    this.masterRegion = MasterRegionFactory.create(this);
121    // Mock an AM.
122    this.assignmentManager =
123      new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion));
124    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
125    this.serverManager = new ServerManager(this, new DummyRegionServerList());
126    this.tableStateManager = mock(TableStateManager.class);
127    when(this.tableStateManager.getTableState(any())).thenReturn(new TableState(
128      TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED));
129
130    // Mock up a Client Interface
131    ClientProtos.ClientService.BlockingInterface ri =
132      mock(ClientProtos.ClientService.BlockingInterface.class);
133    MutateResponse.Builder builder = MutateResponse.newBuilder();
134    builder.setProcessed(true);
135    try {
136      when(ri.mutate(any(), any())).thenReturn(builder.build());
137    } catch (ServiceException se) {
138      throw ProtobufUtil.handleRemoteException(se);
139    }
140    try {
141      when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
142        @Override
143        public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
144          return buildMultiResponse(invocation.getArgument(1));
145        }
146      });
147    } catch (ServiceException se) {
148      throw ProtobufUtil.getRemoteException(se);
149    }
150    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
151    // ClusterConnection return the HRI. Have the HRI return a few mocked up responses
152    // to make our test work.
153    this.connection = HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
154      mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
155      RegionInfoBuilder.FIRST_META_REGIONINFO);
156    // Set hbase.rootdir into test dir.
157    Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
158    CommonFSUtils.setRootDir(getConfiguration(), rootdir);
159    this.rpm = mock(ReplicationPeerManager.class);
160    ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
161    when(rqs.getAllQueues(any())).thenReturn(Collections.emptyList());
162    when(rpm.getQueueStorage()).thenReturn(rqs);
163  }
164
165  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
166    throws IOException, KeeperException {
167    startProcedureExecutor(remoteDispatcher);
168    this.assignmentManager.start();
169    for (int i = 0; i < numServes; ++i) {
170      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
171      serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
172        .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
173    }
174    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
175  }
176
177  /**
178   * Call this restart method only after running MockMasterServices#start() The RSs can be
179   * differentiated by the port number, see ServerName in MockMasterServices#start() method above.
180   * Restart of region server will have new startcode in server name
181   * @param serverName Server name to be restarted
182   */
183  public void restartRegionServer(ServerName serverName) throws IOException {
184    List<ServerName> onlineServers = serverManager.getOnlineServersList();
185    long startCode = -1;
186    for (ServerName s : onlineServers) {
187      if (s.getAddress().equals(serverName.getAddress())) {
188        startCode = s.getStartcode() + 1;
189        break;
190      }
191    }
192    if (startCode == -1) {
193      return;
194    }
195    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
196    serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
197      .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
198  }
199
200  @Override
201  public void stop(String why) {
202    stopProcedureExecutor();
203    this.assignmentManager.stop();
204  }
205
206  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
207    throws IOException {
208    final Configuration conf = getConfiguration();
209    this.procedureStore = new NoopProcedureStore();
210    this.procedureStore.registerListener(new ProcedureStoreListener() {
211
212      @Override
213      public void abortProcess() {
214        abort("The Procedure Store lost the lease", null);
215      }
216    });
217
218    this.procedureEnv = new MasterProcedureEnv(this,
219      remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
220
221    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
222      procedureEnv.getProcedureScheduler());
223
224    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
225      Math.max(Runtime.getRuntime().availableProcessors(),
226        MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
227    final boolean abortOnCorruption =
228      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
229        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
230    this.procedureStore.start(numThreads);
231    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
232    this.procedureEnv.getRemoteDispatcher().start();
233  }
234
235  private void stopProcedureExecutor() {
236    if (this.procedureEnv != null) {
237      this.procedureEnv.getRemoteDispatcher().stop();
238    }
239
240    if (this.procedureExecutor != null) {
241      this.procedureExecutor.stop();
242    }
243
244    if (this.procedureStore != null) {
245      this.procedureStore.stop(isAborted());
246    }
247  }
248
249  @Override
250  public boolean isInitialized() {
251    return true;
252  }
253
254  @Override
255  public ProcedureEvent<?> getInitializedEvent() {
256    return this.initialized;
257  }
258
259  @Override
260  public MasterFileSystem getMasterFileSystem() {
261    return fileSystemManager;
262  }
263
264  @Override
265  public MasterWalManager getMasterWalManager() {
266    return walManager;
267  }
268
269  @Override
270  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
271    return procedureExecutor;
272  }
273
274  @Override
275  public LoadBalancer getLoadBalancer() {
276    return balancer;
277  }
278
279  @Override
280  public ServerManager getServerManager() {
281    return serverManager;
282  }
283
284  @Override
285  public AssignmentManager getAssignmentManager() {
286    return assignmentManager;
287  }
288
289  @Override
290  public TableStateManager getTableStateManager() {
291    return tableStateManager;
292  }
293
294  @Override
295  public ClusterConnection getConnection() {
296    return this.connection;
297  }
298
299  @Override
300  public ServerName getServerName() {
301    return MOCK_MASTER_SERVERNAME;
302  }
303
304  @Override
305  public CoordinatedStateManager getCoordinatedStateManager() {
306    return super.getCoordinatedStateManager();
307  }
308
309  private static class MockRegionStateStore extends RegionStateStore {
310    public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) {
311      super(master, masterRegion);
312    }
313
314    @Override
315    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
316    }
317  }
318
319  @Override
320  public TableDescriptors getTableDescriptors() {
321    return new TableDescriptors() {
322      @Override
323      public TableDescriptor remove(TableName tablename) throws IOException {
324        // noop
325        return null;
326      }
327
328      @Override
329      public Map<String, TableDescriptor> getAll() throws IOException {
330        // noop
331        return null;
332      }
333
334      @Override
335      public TableDescriptor get(TableName tablename) throws IOException {
336        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
337        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
338        return builder.build();
339      }
340
341      @Override
342      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
343        return null;
344      }
345
346      @Override
347      public void update(TableDescriptor htd, boolean cacheOnly) throws IOException {
348        // noop
349      }
350    };
351  }
352
353  private static MultiResponse buildMultiResponse(MultiRequest req) {
354    MultiResponse.Builder builder = MultiResponse.newBuilder();
355    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
356    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
357    for (RegionAction regionAction : req.getRegionActionList()) {
358      regionActionResultBuilder.clear();
359      for (ClientProtos.Action action : regionAction.getActionList()) {
360        roeBuilder.clear();
361        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
362        roeBuilder.setIndex(action.getIndex());
363        regionActionResultBuilder.addResultOrException(roeBuilder.build());
364      }
365      builder.addRegionActionResult(regionActionResultBuilder.build());
366    }
367    return builder.build();
368  }
369
370  @Override
371  public SplitWALManager getSplitWALManager() {
372    return splitWALManager;
373  }
374
375  @Override
376  public ReplicationPeerManager getReplicationPeerManager() {
377    return rpm;
378  }
379}