001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.net.UnknownHostException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
028import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
029import org.apache.hadoop.hbase.master.MasterServices;
030import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
031import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
036
037/**
038 * Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose.
039 */
040public class RSProcDispatcher extends RSProcedureDispatcher {
041
042  private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class);
043
044  private static final AtomicInteger I = new AtomicInteger();
045
046  private static final List<IOException> ERRORS =
047    Arrays.asList(new ConnectionClosedException("test connection closed error..."),
048      new UnknownHostException("test unknown host error..."));
049  private static final AtomicInteger ERROR_IDX = new AtomicInteger();
050
051  public RSProcDispatcher(MasterServices master) {
052    super(master);
053  }
054
055  @Override
056  protected void remoteDispatch(final ServerName serverName,
057    final Set<RemoteProcedure> remoteProcedures) {
058    if (!master.getServerManager().isServerOnline(serverName)) {
059      // fail fast
060      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
061    } else {
062      submitTask(new TestExecuteProceduresRemoteCall(serverName, remoteProcedures));
063    }
064  }
065
066  class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall {
067
068    public TestExecuteProceduresRemoteCall(ServerName serverName,
069      Set<RemoteProcedure> remoteProcedures) {
070      super(serverName, remoteProcedures);
071    }
072
073    @Override
074    public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
075      final AdminProtos.ExecuteProceduresRequest request) throws IOException {
076      int j = I.addAndGet(1);
077      LOG.info("sendRequest() req: {} , j: {}", request, j);
078      if (j == 12 || j == 22) {
079        // Execute the remote close and open region requests in the last (5th) retry before
080        // throwing ConnectionClosedException. This is to ensure even if the region open/close
081        // is successfully completed by regionserver, master still schedules SCP because
082        // sendRequest() throws error which has retry-limit exhausted.
083        FutureUtils.get(getRsAdmin().executeProcedures(request));
084      }
085      // For one of the close region requests and one of the open region requests,
086      // throw ConnectionClosedException until retry limit is exhausted and master
087      // schedules recoveries for the server.
088      // We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own.
089      if (j >= 8 && j <= 13 || j >= 18 && j <= 23) {
090        throw ERRORS.get(ERROR_IDX.getAndIncrement() % ERRORS.size());
091      }
092      return FutureUtils.get(getRsAdmin().executeProcedures(request));
093    }
094
095    private AsyncRegionServerAdmin getRsAdmin() {
096      return master.getAsyncClusterConnection().getRegionServerAdmin(getServerName());
097    }
098  }
099
100  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
101
102    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
103      super(serverName, remoteProcedures);
104    }
105
106    @Override
107    public void run() {
108      remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(),
109        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
110    }
111  }
112
113}