001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.atomic.AtomicInteger; 022import org.apache.hadoop.hbase.Abortable; 023import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 024import org.apache.hadoop.util.StringUtils; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Thread to handle rpc call. Should only be used in {@link RpcExecutor} and its sub-classes. 031 */ 032@InterfaceAudience.Private 033public class RpcHandler extends Thread { 034 private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class); 035 036 /** 037 * Q to find CallRunners to run in. 038 */ 039 final BlockingQueue<CallRunner> q; 040 041 final int handlerCount; 042 final double handlerFailureThreshhold; 043 044 // metrics (shared with other handlers) 045 final AtomicInteger activeHandlerCount; 046 final AtomicInteger failedHandlerCount; 047 048 // The up-level RpcServer. 049 final Abortable abortable; 050 051 private boolean running; 052 053 RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount, 054 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount, 055 final AtomicInteger failedHandlerCount, final Abortable abortable) { 056 super(name); 057 setDaemon(true); 058 this.q = q; 059 this.handlerFailureThreshhold = handlerFailureThreshhold; 060 this.activeHandlerCount = activeHandlerCount; 061 this.failedHandlerCount = failedHandlerCount; 062 this.handlerCount = handlerCount; 063 this.abortable = abortable; 064 } 065 066 /** 067 * @return A {@link CallRunner} 068 * @throws InterruptedException thrown by {@link BlockingQueue#take()} 069 */ 070 protected CallRunner getCallRunner() throws InterruptedException { 071 return this.q.take(); 072 } 073 074 public void stopRunning() { 075 running = false; 076 } 077 078 @Override 079 public void run() { 080 boolean interrupted = false; 081 running = true; 082 try { 083 while (running) { 084 try { 085 run(getCallRunner()); 086 } catch (InterruptedException e) { 087 interrupted = true; 088 } 089 } 090 } catch (Exception e) { 091 LOG.warn(e.toString(), e); 092 throw e; 093 } finally { 094 if (interrupted) { 095 Thread.currentThread().interrupt(); 096 } 097 } 098 } 099 100 private void run(CallRunner cr) { 101 MonitoredRPCHandler status = RpcServer.getStatus(); 102 cr.setStatus(status); 103 try { 104 this.activeHandlerCount.incrementAndGet(); 105 cr.run(); 106 } catch (Throwable e) { 107 if (e instanceof Error) { 108 int failedCount = failedHandlerCount.incrementAndGet(); 109 if ( 110 this.handlerFailureThreshhold >= 0 111 && failedCount > handlerCount * this.handlerFailureThreshhold 112 ) { 113 String message = "Number of failed RpcServer handler runs exceeded threshhold " 114 + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); 115 if (abortable != null) { 116 abortable.abort(message, e); 117 } else { 118 LOG.error("Error but can't abort because abortable is null: " 119 + StringUtils.stringifyException(e)); 120 throw e; 121 } 122 } else { 123 LOG.warn("Handler errors " + StringUtils.stringifyException(e)); 124 } 125 } else { 126 LOG.warn("Handler exception " + StringUtils.stringifyException(e)); 127 } 128 } finally { 129 this.activeHandlerCount.decrementAndGet(); 130 } 131 } 132}