001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.concurrent.BlockingQueue; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.Abortable; 023import org.apache.hadoop.hbase.HBaseInterfaceAudience; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027 028/** 029 * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains 030 * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to FIFO but 031 * you can pass an alternate queue class to use. 032 */ 033@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 034@InterfaceStability.Evolving 035public class BalancedQueueRpcExecutor extends RpcExecutor { 036 037 private final QueueBalancer balancer; 038 039 public BalancedQueueRpcExecutor(final String name, final int handlerCount, 040 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 041 final Abortable abortable) { 042 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), 043 maxQueueLength, priority, conf, abortable); 044 } 045 046 public BalancedQueueRpcExecutor(final String name, final int handlerCount, 047 final String callQueueType, final int maxQueueLength, final PriorityFunction priority, 048 final Configuration conf, final Abortable abortable) { 049 super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); 050 initializeQueues(this.numCallQueues); 051 this.balancer = getBalancer(name, conf, getQueues()); 052 } 053 054 @Override 055 public boolean dispatch(final CallRunner callTask) { 056 int queueIndex = balancer.getNextQueue(callTask); 057 BlockingQueue<CallRunner> queue = queues.get(queueIndex); 058 // that means we can overflow by at most <num reader> size (5), that's ok 059 if (queue.size() >= currentQueueLimit) { 060 return false; 061 } 062 return queue.offer(callTask); 063 } 064 065 @Override 066 public void onConfigurationChange(Configuration conf) { 067 super.onConfigurationChange(conf); 068 069 if (balancer instanceof ConfigurationObserver) { 070 ((ConfigurationObserver) balancer).onConfigurationChange(conf); 071 } 072 } 073}