001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.Abortable; 022import org.apache.hadoop.hbase.HBaseInterfaceAudience; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.conf.ConfigurationObserver; 025import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.yetus.audience.InterfaceStability; 028 029/** 030 * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), 031 * high-priority ('priority'), and replication ('replication') requests. Default behavior is to 032 * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. See 033 * below article for explanation of options. 034 * @see <a href= 035 * "http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview 036 * on Request Queuing</a> 037 */ 038@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 039@InterfaceStability.Evolving 040public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { 041 private int port; 042 private final PriorityFunction priority; 043 private final RpcExecutor callExecutor; 044 private final RpcExecutor priorityExecutor; 045 private final RpcExecutor replicationExecutor; 046 047 /** 048 * This executor is only for meta transition 049 */ 050 private final RpcExecutor metaTransitionExecutor; 051 052 private final RpcExecutor bulkloadExecutor; 053 054 /** What level a high priority call is at. */ 055 private final int highPriorityLevel; 056 057 private Abortable abortable = null; 058 059 /** 060 * @param handlerCount the number of handler threads that will be used to process calls 061 * @param priorityHandlerCount How many threads for priority handling. 062 * @param replicationHandlerCount How many threads for replication handling. 063 * @param priority Function to extract request priority. 064 */ 065 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 066 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 067 Abortable server, int highPriorityLevel) { 068 int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, 069 HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); 070 int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 071 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 072 int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, 073 priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 074 int maxReplicationQueueLength = 075 conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, 076 replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 077 int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, 078 bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 079 080 this.priority = priority; 081 this.highPriorityLevel = highPriorityLevel; 082 this.abortable = server; 083 084 String callQueueType = 085 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 086 float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); 087 088 if (callqReadShare > 0) { 089 // at least 1 read handler and 1 write handler 090 callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), 091 maxQueueLength, priority, conf, server); 092 } else { 093 if ( 094 RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) 095 || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf) 096 ) { 097 callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, 098 maxQueueLength, priority, conf, server); 099 } else { 100 callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, 101 priority, conf, server); 102 } 103 } 104 105 float metaCallqReadShare = 106 conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 107 MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); 108 if (metaCallqReadShare > 0) { 109 // different read/write handler for meta, at least 1 read handler and 1 write handler 110 this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ", 111 Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server); 112 } else { 113 // Create 2 queues to help priorityExecutor be more scalable. 114 this.priorityExecutor = priorityHandlerCount > 0 115 ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, 116 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 117 abortable) 118 : null; 119 } 120 this.replicationExecutor = replicationHandlerCount > 0 121 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, 122 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, 123 abortable) 124 : null; 125 126 this.metaTransitionExecutor = metaTransitionHandler > 0 127 ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, 128 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, 129 abortable) 130 : null; 131 this.bulkloadExecutor = bulkLoadHandlerCount > 0 132 ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount, 133 RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf, 134 abortable) 135 : null; 136 } 137 138 public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 139 int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { 140 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null, 141 highPriorityLevel); 142 } 143 144 /** 145 * Resize call queues; 146 * @param conf new configuration 147 */ 148 @Override 149 public void onConfigurationChange(Configuration conf) { 150 callExecutor.resizeQueues(conf); 151 if (priorityExecutor != null) { 152 priorityExecutor.resizeQueues(conf); 153 } 154 if (replicationExecutor != null) { 155 replicationExecutor.resizeQueues(conf); 156 } 157 if (metaTransitionExecutor != null) { 158 metaTransitionExecutor.resizeQueues(conf); 159 } 160 if (bulkloadExecutor != null) { 161 bulkloadExecutor.resizeQueues(conf); 162 } 163 164 String callQueueType = 165 conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); 166 if ( 167 RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueType(callQueueType) 168 ) { 169 callExecutor.onConfigurationChange(conf); 170 } 171 } 172 173 @Override 174 public void init(Context context) { 175 this.port = context.getListenerAddress().getPort(); 176 } 177 178 @Override 179 public void start() { 180 callExecutor.start(port); 181 if (priorityExecutor != null) { 182 priorityExecutor.start(port); 183 } 184 if (replicationExecutor != null) { 185 replicationExecutor.start(port); 186 } 187 if (metaTransitionExecutor != null) { 188 metaTransitionExecutor.start(port); 189 } 190 if (bulkloadExecutor != null) { 191 bulkloadExecutor.start(port); 192 } 193 194 } 195 196 @Override 197 public void stop() { 198 callExecutor.stop(); 199 if (priorityExecutor != null) { 200 priorityExecutor.stop(); 201 } 202 if (replicationExecutor != null) { 203 replicationExecutor.stop(); 204 } 205 if (metaTransitionExecutor != null) { 206 metaTransitionExecutor.stop(); 207 } 208 if (bulkloadExecutor != null) { 209 bulkloadExecutor.stop(); 210 } 211 212 } 213 214 @Override 215 public boolean dispatch(CallRunner callTask) { 216 RpcCall call = callTask.getRpcCall(); 217 int level = 218 priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser().orElse(null)); 219 if (level == HConstants.PRIORITY_UNSET) { 220 level = HConstants.NORMAL_QOS; 221 } 222 if ( 223 metaTransitionExecutor != null 224 && level == MasterAnnotationReadingPriorityFunction.META_TRANSITION_QOS 225 ) { 226 return metaTransitionExecutor.dispatch(callTask); 227 } else if (priorityExecutor != null && level > highPriorityLevel) { 228 return priorityExecutor.dispatch(callTask); 229 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { 230 return replicationExecutor.dispatch(callTask); 231 } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) { 232 return bulkloadExecutor.dispatch(callTask); 233 } else { 234 return callExecutor.dispatch(callTask); 235 } 236 } 237 238 @Override 239 public int getMetaPriorityQueueLength() { 240 return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength(); 241 } 242 243 @Override 244 public int getGeneralQueueLength() { 245 return callExecutor.getQueueLength(); 246 } 247 248 @Override 249 public int getPriorityQueueLength() { 250 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); 251 } 252 253 @Override 254 public int getReplicationQueueLength() { 255 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); 256 } 257 258 @Override 259 public int getBulkLoadQueueLength() { 260 return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength(); 261 } 262 263 @Override 264 public int getActiveRpcHandlerCount() { 265 return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount() 266 + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount() 267 + getActiveBulkLoadRpcHandlerCount(); 268 } 269 270 @Override 271 public int getActiveMetaPriorityRpcHandlerCount() { 272 return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount()); 273 } 274 275 @Override 276 public int getActiveGeneralRpcHandlerCount() { 277 return callExecutor.getActiveHandlerCount(); 278 } 279 280 @Override 281 public int getActivePriorityRpcHandlerCount() { 282 return (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()); 283 } 284 285 @Override 286 public int getActiveReplicationRpcHandlerCount() { 287 return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); 288 } 289 290 @Override 291 public int getActiveBulkLoadRpcHandlerCount() { 292 return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount(); 293 } 294 295 @Override 296 public long getNumGeneralCallsDropped() { 297 return callExecutor.getNumGeneralCallsDropped(); 298 } 299 300 @Override 301 public long getNumLifoModeSwitches() { 302 return callExecutor.getNumLifoModeSwitches(); 303 } 304 305 @Override 306 public int getWriteQueueLength() { 307 return callExecutor.getWriteQueueLength(); 308 } 309 310 @Override 311 public int getReadQueueLength() { 312 return callExecutor.getReadQueueLength(); 313 } 314 315 @Override 316 public int getScanQueueLength() { 317 return callExecutor.getScanQueueLength(); 318 } 319 320 @Override 321 public int getActiveWriteRpcHandlerCount() { 322 return callExecutor.getActiveWriteHandlerCount(); 323 } 324 325 @Override 326 public int getActiveReadRpcHandlerCount() { 327 return callExecutor.getActiveReadHandlerCount(); 328 } 329 330 @Override 331 public int getActiveScanRpcHandlerCount() { 332 return callExecutor.getActiveScanHandlerCount(); 333 } 334 335 @Override 336 public CallQueueInfo getCallQueueInfo() { 337 String queueName; 338 339 CallQueueInfo callQueueInfo = new CallQueueInfo(); 340 341 if (null != callExecutor) { 342 queueName = "Call Queue"; 343 callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary()); 344 callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary()); 345 } 346 347 if (null != priorityExecutor) { 348 queueName = "Priority Queue"; 349 callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary()); 350 callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary()); 351 } 352 353 if (null != replicationExecutor) { 354 queueName = "Replication Queue"; 355 callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary()); 356 callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary()); 357 } 358 359 if (null != metaTransitionExecutor) { 360 queueName = "Meta Transition Queue"; 361 callQueueInfo.setCallMethodCount(queueName, 362 metaTransitionExecutor.getCallQueueCountsSummary()); 363 callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); 364 } 365 366 if (null != bulkloadExecutor) { 367 queueName = "BulkLoad Queue"; 368 callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary()); 369 callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary()); 370 } 371 372 return callQueueInfo; 373 } 374 375}