001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.hadoop.hbase.HBaseInterfaceAudience;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.conf.ConfigurationObserver;
025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028
029/**
030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See
033 * below article for explanation of options.
034 * @see <a href=
035 *      "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview
036 *      on Request Queuing</a>
037 */
038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
039@InterfaceStability.Evolving
040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
041  private int port;
042  private final PriorityFunction priority;
043  private final RpcExecutor callExecutor;
044  private final RpcExecutor priorityExecutor;
045  private final RpcExecutor replicationExecutor;
046
047  /**
048   * This executor is only for meta transition
049   */
050  private final RpcExecutor metaTransitionExecutor;
051
052  private final RpcExecutor bulkloadExecutor;
053
054  /** What level a high priority call is at. */
055  private final int highPriorityLevel;
056
057  private Abortable abortable = null;
058
059  /**
060   * @param handlerCount            the number of handler threads that will be used to process calls
061   * @param priorityHandlerCount    How many threads for priority handling.
062   * @param replicationHandlerCount How many threads for replication handling.
063   * @param priority                Function to extract request priority.
064   */
065  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
066    int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
067    Abortable server, int highPriorityLevel) {
068    int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
069      HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
070    int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
071      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
072    int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
073      priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
074    int maxReplicationQueueLength =
075      conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
076        replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
077    int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
078      bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
079
080    this.priority = priority;
081    this.highPriorityLevel = highPriorityLevel;
082    this.abortable = server;
083
084    String callQueueType =
085      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
086    float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
087
088    if (callqReadShare > 0) {
089      // at least 1 read handler and 1 write handler
090      callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
091        maxQueueLength, priority, conf, server);
092    } else {
093      if (
094        RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)
095          || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)
096      ) {
097        callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
098          maxQueueLength, priority, conf, server);
099      } else {
100        callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
101          priority, conf, server);
102      }
103    }
104
105    float metaCallqReadShare =
106      conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
107        MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
108    if (metaCallqReadShare > 0) {
109      // different read/write handler for meta, at least 1 read handler and 1 write handler
110      this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ",
111        Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server);
112    } else {
113      // Create 2 queues to help priorityExecutor be more scalable.
114      this.priorityExecutor = priorityHandlerCount > 0
115        ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
116          RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
117          abortable)
118        : null;
119    }
120    this.replicationExecutor = replicationHandlerCount > 0
121      ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
122        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf,
123        abortable)
124      : null;
125
126    this.metaTransitionExecutor = metaTransitionHandler > 0
127      ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
128        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
129        abortable)
130      : null;
131    this.bulkloadExecutor = bulkLoadHandlerCount > 0
132      ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount,
133        RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf,
134        abortable)
135      : null;
136  }
137
138  public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
139    int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
140    this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
141      highPriorityLevel);
142  }
143
144  /**
145   * Resize call queues;
146   * @param conf new configuration
147   */
148  @Override
149  public void onConfigurationChange(Configuration conf) {
150    callExecutor.resizeQueues(conf);
151    if (priorityExecutor != null) {
152      priorityExecutor.resizeQueues(conf);
153    }
154    if (replicationExecutor != null) {
155      replicationExecutor.resizeQueues(conf);
156    }
157    if (metaTransitionExecutor != null) {
158      metaTransitionExecutor.resizeQueues(conf);
159    }
160    if (bulkloadExecutor != null) {
161      bulkloadExecutor.resizeQueues(conf);
162    }
163
164    String callQueueType =
165      conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
166    if (
167      RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType)
168    ) {
169      callExecutor.onConfigurationChange(conf);
170    }
171  }
172
173  @Override
174  public void init(Context context) {
175    this.port = context.getListenerAddress().getPort();
176  }
177
178  @Override
179  public void start() {
180    callExecutor.start(port);
181    if (priorityExecutor != null) {
182      priorityExecutor.start(port);
183    }
184    if (replicationExecutor != null) {
185      replicationExecutor.start(port);
186    }
187    if (metaTransitionExecutor != null) {
188      metaTransitionExecutor.start(port);
189    }
190    if (bulkloadExecutor != null) {
191      bulkloadExecutor.start(port);
192    }
193
194  }
195
196  @Override
197  public void stop() {
198    callExecutor.stop();
199    if (priorityExecutor != null) {
200      priorityExecutor.stop();
201    }
202    if (replicationExecutor != null) {
203      replicationExecutor.stop();
204    }
205    if (metaTransitionExecutor != null) {
206      metaTransitionExecutor.stop();
207    }
208    if (bulkloadExecutor != null) {
209      bulkloadExecutor.stop();
210    }
211
212  }
213
214  @Override
215  public boolean dispatch(CallRunner callTask) {
216    RpcCall call = callTask.getRpcCall();
217    int level =
218      priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null));
219    if (level == HConstants.PRIORITY_UNSET) {
220      level = HConstants.NORMAL_QOS;
221    }
222    if (
223      metaTransitionExecutor != null
224        && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS
225    ) {
226      return metaTransitionExecutor.dispatch(callTask);
227    } else if (priorityExecutor != null && level > highPriorityLevel) {
228      return priorityExecutor.dispatch(callTask);
229    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
230      return replicationExecutor.dispatch(callTask);
231    } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
232      return bulkloadExecutor.dispatch(callTask);
233    } else {
234      return callExecutor.dispatch(callTask);
235    }
236  }
237
238  @Override
239  public int getMetaPriorityQueueLength() {
240    return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
241  }
242
243  @Override
244  public int getGeneralQueueLength() {
245    return callExecutor.getQueueLength();
246  }
247
248  @Override
249  public int getPriorityQueueLength() {
250    return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
251  }
252
253  @Override
254  public int getReplicationQueueLength() {
255    return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
256  }
257
258  @Override
259  public int getBulkLoadQueueLength() {
260    return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
261  }
262
263  @Override
264  public int getActiveRpcHandlerCount() {
265    return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
266      + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount()
267      + getActiveBulkLoadRpcHandlerCount();
268  }
269
270  @Override
271  public int getActiveMetaPriorityRpcHandlerCount() {
272    return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
273  }
274
275  @Override
276  public int getActiveGeneralRpcHandlerCount() {
277    return callExecutor.getActiveHandlerCount();
278  }
279
280  @Override
281  public int getActivePriorityRpcHandlerCount() {
282    return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount());
283  }
284
285  @Override
286  public int getActiveReplicationRpcHandlerCount() {
287    return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
288  }
289
290  @Override
291  public int getActiveBulkLoadRpcHandlerCount() {
292    return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount();
293  }
294
295  @Override
296  public long getNumGeneralCallsDropped() {
297    return callExecutor.getNumGeneralCallsDropped();
298  }
299
300  @Override
301  public long getNumLifoModeSwitches() {
302    return callExecutor.getNumLifoModeSwitches();
303  }
304
305  @Override
306  public int getWriteQueueLength() {
307    return callExecutor.getWriteQueueLength();
308  }
309
310  @Override
311  public int getReadQueueLength() {
312    return callExecutor.getReadQueueLength();
313  }
314
315  @Override
316  public int getScanQueueLength() {
317    return callExecutor.getScanQueueLength();
318  }
319
320  @Override
321  public int getActiveWriteRpcHandlerCount() {
322    return callExecutor.getActiveWriteHandlerCount();
323  }
324
325  @Override
326  public int getActiveReadRpcHandlerCount() {
327    return callExecutor.getActiveReadHandlerCount();
328  }
329
330  @Override
331  public int getActiveScanRpcHandlerCount() {
332    return callExecutor.getActiveScanHandlerCount();
333  }
334
335  @Override
336  public CallQueueInfo getCallQueueInfo() {
337    String queueName;
338
339    CallQueueInfo callQueueInfo = new CallQueueInfo();
340
341    if (null != callExecutor) {
342      queueName = "Call Queue";
343      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
344      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
345    }
346
347    if (null != priorityExecutor) {
348      queueName = "Priority Queue";
349      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
350      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
351    }
352
353    if (null != replicationExecutor) {
354      queueName = "Replication Queue";
355      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
356      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
357    }
358
359    if (null != metaTransitionExecutor) {
360      queueName = "Meta Transition Queue";
361      callQueueInfo.setCallMethodCount(queueName,
362        metaTransitionExecutor.getCallQueueCountsSummary());
363      callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
364    }
365
366    if (null != bulkloadExecutor) {
367      queueName = "BulkLoad Queue";
368      callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary());
369      callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary());
370    }
371
372    return callQueueInfo;
373  }
374
375}