001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Queue;
021import java.util.concurrent.atomic.AtomicInteger;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Abortable;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.conf.ConfigurationObserver;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
040
041/**
042 * RPC Executor that uses different queues for reads and writes. With the options to use different
043 * queues/executors for gets and scans. Each handler has its own queue and there is no stealing.
044 */
045@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
046@InterfaceStability.Evolving
047public class RWQueueRpcExecutor extends RpcExecutor {
048  private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
049
050  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
051    "hbase.ipc.server.callqueue.read.ratio";
052  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
053    "hbase.ipc.server.callqueue.scan.ratio";
054
055  private final QueueBalancer writeBalancer;
056  private final QueueBalancer readBalancer;
057  private final QueueBalancer scanBalancer;
058  private final int writeHandlersCount;
059  private final int readHandlersCount;
060  private final int scanHandlersCount;
061  private final int numWriteQueues;
062  private final int numReadQueues;
063  private final int numScanQueues;
064
065  private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
066  private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
067  private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
068
069  public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
070    final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
071    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
072
073    float callqReadShare = getReadShare(conf);
074    float callqScanShare = getScanShare(conf);
075
076    numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
077    writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
078
079    int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
080    int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
081
082    int scanHandlers = Math.max(0, (int) Math.floor(readHandlers * callqScanShare));
083    int scanQueues =
084      scanHandlers > 0 ? Math.max(1, (int) Math.floor(readQueues * callqScanShare)) : 0;
085
086    if (scanQueues > 0) {
087      // if scanQueues > 0, the handler count of read should > 0, then we make readQueues >= 1
088      readQueues = Math.max(1, readQueues - scanQueues);
089      readHandlers -= scanHandlers;
090    } else {
091      scanQueues = 0;
092      scanHandlers = 0;
093    }
094
095    numReadQueues = readQueues;
096    readHandlersCount = readHandlers;
097    numScanQueues = scanQueues;
098    scanHandlersCount = scanHandlers;
099
100    initializeQueues(numWriteQueues);
101    initializeQueues(numReadQueues);
102    initializeQueues(numScanQueues);
103
104    this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
105    this.readBalancer =
106      getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
107    this.scanBalancer = numScanQueues > 0
108      ? getBalancer(name, conf,
109        queues.subList(numWriteQueues + numReadQueues,
110          numWriteQueues + numReadQueues + numScanQueues))
111      : null;
112
113    LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
114      + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
115      + numScanQueues + " scanHandlers=" + scanHandlersCount);
116  }
117
118  @Override
119  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
120    // at least 1 read queue and 1 write queue
121    return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
122  }
123
124  @Override
125  protected void startHandlers(final int port) {
126    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
127      activeWriteHandlerCount);
128    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
129      activeReadHandlerCount);
130    if (numScanQueues > 0) {
131      startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
132        numScanQueues, port, activeScanHandlerCount);
133    }
134  }
135
136  @Override
137  public boolean dispatch(final CallRunner callTask) {
138    RpcCall call = callTask.getRpcCall();
139    return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
140      shouldDispatchToScanQueue(callTask), callTask);
141  }
142
143  protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
144    final CallRunner callTask) {
145    int queueIndex;
146    if (toWriteQueue) {
147      queueIndex = writeBalancer.getNextQueue(callTask);
148    } else if (toScanQueue) {
149      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
150    } else {
151      queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
152    }
153
154    Queue<CallRunner> queue = queues.get(queueIndex);
155    if (queue.size() >= currentQueueLimit) {
156      return false;
157    }
158    return queue.offer(callTask);
159  }
160
161  @Override
162  public int getWriteQueueLength() {
163    int length = 0;
164    for (int i = 0; i < numWriteQueues; i++) {
165      length += queues.get(i).size();
166    }
167    return length;
168  }
169
170  @Override
171  public int getReadQueueLength() {
172    int length = 0;
173    for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
174      length += queues.get(i).size();
175    }
176    return length;
177  }
178
179  @Override
180  public int getScanQueueLength() {
181    int length = 0;
182    for (int i = numWriteQueues + numReadQueues; i
183        < (numWriteQueues + numReadQueues + numScanQueues); i++) {
184      length += queues.get(i).size();
185    }
186    return length;
187  }
188
189  @Override
190  public int getActiveHandlerCount() {
191    return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
192      + activeScanHandlerCount.get();
193  }
194
195  @Override
196  public int getActiveWriteHandlerCount() {
197    return activeWriteHandlerCount.get();
198  }
199
200  @Override
201  public int getActiveReadHandlerCount() {
202    return activeReadHandlerCount.get();
203  }
204
205  @Override
206  public int getActiveScanHandlerCount() {
207    return activeScanHandlerCount.get();
208  }
209
210  protected boolean isWriteRequest(final RequestHeader header, final Message param) {
211    // TODO: Is there a better way to do this?
212    if (param instanceof MultiRequest) {
213      MultiRequest multi = (MultiRequest) param;
214      for (RegionAction regionAction : multi.getRegionActionList()) {
215        for (Action action : regionAction.getActionList()) {
216          if (action.hasMutation()) {
217            return true;
218          }
219        }
220      }
221    }
222    if (param instanceof MutateRequest) {
223      return true;
224    }
225    // Below here are methods for master. It's a pretty brittle version of this.
226    // Not sure that master actually needs a read/write queue since 90% of requests to
227    // master are writing to status or changing the meta table.
228    // All other read requests are admin generated and can be processed whenever.
229    // However changing that would require a pretty drastic change and should be done for
230    // the next major release and not as a fix for HBASE-14239
231    if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
232      return true;
233    }
234    if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
235      return true;
236    }
237    if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
238      return true;
239    }
240    return false;
241  }
242
243  QueueBalancer getWriteBalancer() {
244    return writeBalancer;
245  }
246
247  QueueBalancer getReadBalancer() {
248    return readBalancer;
249  }
250
251  QueueBalancer getScanBalancer() {
252    return scanBalancer;
253  }
254
255  private boolean isScanRequest(final RequestHeader header, final Message param) {
256    return param instanceof ScanRequest;
257  }
258
259  protected boolean shouldDispatchToScanQueue(final CallRunner task) {
260    RpcCall call = task.getRpcCall();
261    return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
262  }
263
264  protected float getReadShare(final Configuration conf) {
265    return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
266  }
267
268  protected float getScanShare(final Configuration conf) {
269    return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
270  }
271
272  /*
273   * Calculate the number of writers based on the "total count" and the read share. You'll get at
274   * least one writer.
275   */
276  private static int calcNumWriters(final int count, final float readShare) {
277    return Math.max(1, count - Math.max(1, (int) Math.round(count * readShare)));
278  }
279
280  /*
281   * Calculate the number of readers based on the "total count" and the read share. You'll get at
282   * least one reader.
283   */
284  private static int calcNumReaders(final int count, final float readShare) {
285    return count - calcNumWriters(count, readShare);
286  }
287
288  @Override
289  public void onConfigurationChange(Configuration conf) {
290    super.onConfigurationChange(conf);
291    propagateBalancerConfigChange(writeBalancer, conf);
292    propagateBalancerConfigChange(readBalancer, conf);
293    propagateBalancerConfigChange(scanBalancer, conf);
294  }
295
296  private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
297    if (balancer instanceof ConfigurationObserver) {
298      ((ConfigurationObserver) balancer).onConfigurationChange(conf);
299    }
300  }
301}