001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.atomic.AtomicInteger;
022import org.apache.hadoop.hbase.Abortable;
023import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
024import org.apache.hadoop.util.StringUtils;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Thread to handle rpc call. Should only be used in {@link RpcExecutor} and its sub-classes.
031 */
032@InterfaceAudience.Private
033public class RpcHandler extends Thread {
034  private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
035
036  /**
037   * Q to find CallRunners to run in.
038   */
039  final BlockingQueue<CallRunner> q;
040
041  final int handlerCount;
042  final double handlerFailureThreshhold;
043
044  // metrics (shared with other handlers)
045  final AtomicInteger activeHandlerCount;
046  final AtomicInteger failedHandlerCount;
047
048  // The up-level RpcServer.
049  final Abortable abortable;
050
051  private boolean running;
052
053  RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
054    final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
055    final AtomicInteger failedHandlerCount, final Abortable abortable) {
056    super(name);
057    setDaemon(true);
058    this.q = q;
059    this.handlerFailureThreshhold = handlerFailureThreshhold;
060    this.activeHandlerCount = activeHandlerCount;
061    this.failedHandlerCount = failedHandlerCount;
062    this.handlerCount = handlerCount;
063    this.abortable = abortable;
064  }
065
066  /**
067   * @return A {@link CallRunner}
068   * @throws InterruptedException thrown by {@link BlockingQueue#take()}
069   */
070  protected CallRunner getCallRunner() throws InterruptedException {
071    return this.q.take();
072  }
073
074  public void stopRunning() {
075    running = false;
076  }
077
078  @Override
079  public void run() {
080    boolean interrupted = false;
081    running = true;
082    try {
083      while (running) {
084        try {
085          run(getCallRunner());
086        } catch (InterruptedException e) {
087          interrupted = true;
088        }
089      }
090    } catch (Exception e) {
091      LOG.warn(e.toString(), e);
092      throw e;
093    } finally {
094      if (interrupted) {
095        Thread.currentThread().interrupt();
096      }
097    }
098  }
099
100  private void run(CallRunner cr) {
101    MonitoredRPCHandler status = RpcServer.getStatus();
102    cr.setStatus(status);
103    try {
104      this.activeHandlerCount.incrementAndGet();
105      cr.run();
106    } catch (Throwable e) {
107      if (e instanceof Error) {
108        int failedCount = failedHandlerCount.incrementAndGet();
109        if (
110          this.handlerFailureThreshhold >= 0
111            && failedCount > handlerCount * this.handlerFailureThreshhold
112        ) {
113          String message = "Number of failed RpcServer handler runs exceeded threshhold "
114            + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
115          if (abortable != null) {
116            abortable.abort(message, e);
117          } else {
118            LOG.error("Error but can't abort because abortable is null: "
119              + StringUtils.stringifyException(e));
120            throw e;
121          }
122        } else {
123          LOG.warn("Handler errors " + StringUtils.stringifyException(e));
124        }
125      } else {
126        LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
127      }
128    } finally {
129      this.activeHandlerCount.decrementAndGet();
130    }
131  }
132}