001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Set;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
025import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
026import org.apache.hadoop.hbase.master.MasterServices;
027import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
028import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
033
034/**
035 * Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose.
036 */
037public class RSProcDispatcher extends RSProcedureDispatcher {
038
039  private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class);
040
041  private static final AtomicInteger i = new AtomicInteger();
042
043  public RSProcDispatcher(MasterServices master) {
044    super(master);
045  }
046
047  @Override
048  protected void remoteDispatch(final ServerName serverName,
049    final Set<RemoteProcedure> remoteProcedures) {
050    if (!master.getServerManager().isServerOnline(serverName)) {
051      // fail fast
052      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
053    } else {
054      submitTask(new TestExecuteProceduresRemoteCall(serverName, remoteProcedures));
055    }
056  }
057
058  class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall {
059
060    public TestExecuteProceduresRemoteCall(ServerName serverName,
061      Set<RemoteProcedure> remoteProcedures) {
062      super(serverName, remoteProcedures);
063    }
064
065    @Override
066    public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
067      final AdminProtos.ExecuteProceduresRequest request) throws IOException {
068      int j = i.addAndGet(1);
069      LOG.info("sendRequest() req: {} , j: {}", request, j);
070      if (j == 12 || j == 22) {
071        // Execute the remote close and open region requests in the last (5th) retry before
072        // throwing ConnectionClosedException. This is to ensure even if the region open/close
073        // is successfully completed by regionserver, master still schedules SCP because
074        // sendRequest() throws error which has retry-limit exhausted.
075        FutureUtils.get(getRsAdmin().executeProcedures(request));
076      }
077      // For one of the close region requests and one of the open region requests,
078      // throw ConnectionClosedException until retry limit is exhausted and master
079      // schedules recoveries for the server.
080      // We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own.
081      if (j >= 8 && j <= 13 || j >= 18 && j <= 23) {
082        throw new ConnectionClosedException("test connection closed error...");
083      }
084      return FutureUtils.get(getRsAdmin().executeProcedures(request));
085    }
086
087    private AsyncRegionServerAdmin getRsAdmin() {
088      return master.getAsyncClusterConnection().getRegionServerAdmin(getServerName());
089    }
090  }
091
092  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
093
094    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
095      super(serverName, remoteProcedures);
096    }
097
098    @Override
099    public void run() {
100      remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(),
101        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
102    }
103  }
104
105}