001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021 022import com.google.errorprone.annotations.RestrictedApi; 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.WritableByteChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Locale; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.atomic.LongAdder; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.conf.ConfigurationObserver; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 046import org.apache.hadoop.hbase.monitoring.TaskMonitor; 047import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 048import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 049import org.apache.hadoop.hbase.regionserver.RSRpcServices; 050import org.apache.hadoop.hbase.security.HBasePolicyProvider; 051import org.apache.hadoop.hbase.security.SaslUtil; 052import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.UserProvider; 055import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.GsonUtil; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.authorize.AuthorizationException; 061import org.apache.hadoop.security.authorize.PolicyProvider; 062import org.apache.hadoop.security.authorize.ProxyUsers; 063import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 064import org.apache.hadoop.security.token.SecretManager; 065import org.apache.hadoop.security.token.TokenIdentifier; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 071import org.apache.hbase.thirdparty.com.google.gson.Gson; 072import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 073import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 074import org.apache.hbase.thirdparty.com.google.protobuf.Message; 075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 076import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 077 078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 081 082/** 083 * An RPC server that hosts protobuf described Services. 084 */ 085@InterfaceAudience.Private 086public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver { 087 // LOG is being used in CallRunner and the log level is being changed in tests 088 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 089 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = 090 new CallQueueTooBigException(); 091 092 private static final String MULTI_GETS = "multi.gets"; 093 private static final String MULTI_MUTATIONS = "multi.mutations"; 094 private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; 095 096 private final boolean authorize; 097 private volatile boolean isOnlineLogProviderEnabled; 098 protected boolean isSecurityEnabled; 099 100 public static final byte CURRENT_VERSION = 0; 101 102 /** 103 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 104 */ 105 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 106 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 107 108 /** 109 * How many calls/handler are allowed in the queue. 110 */ 111 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 112 113 protected final CellBlockBuilder cellBlockBuilder; 114 115 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 116 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 117 protected static final Logger AUDITLOG = 118 LoggerFactory.getLogger("SecurityLogger." + Server.class.getName()); 119 protected SecretManager<TokenIdentifier> secretManager; 120 protected final Map<String, String> saslProps; 121 protected final String serverPrincipal; 122 123 protected ServiceAuthorizationManager authManager; 124 125 /** 126 * This is set to Call object before Handler invokes an RPC and ybdie after the call returns. 127 */ 128 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 129 130 /** Keeps MonitoredRPCHandler per handler thread. */ 131 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 132 133 protected final InetSocketAddress bindAddress; 134 135 protected MetricsHBaseServer metrics; 136 137 protected final Configuration conf; 138 139 /** 140 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 141 * this size, then we will reject the call (after parsing it though). It will go back to the 142 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 143 * call queue size gets incremented after we parse a call and before we add it to the queue of 144 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 145 * size is kept in {@link #callQueueSizeInBytes}. 146 * @see #callQueueSizeInBytes 147 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 148 */ 149 protected final long maxQueueSizeInBytes; 150 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 151 152 /** 153 * This is a running count of the size in bytes of all outstanding calls whether currently 154 * executing or queued waiting to be run. 155 */ 156 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 157 158 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 159 protected final boolean tcpKeepAlive; // if T then use keepalives 160 161 /** 162 * This flag is used to indicate to sub threads when they should go down. When we call 163 * {@link #start()}, all threads started will consult this flag on whether they should keep going. 164 * It is set to false when {@link #stop()} is called. 165 */ 166 volatile boolean running = true; 167 168 /** 169 * This flag is set to true after all threads are up and 'running' and the server is then opened 170 * for business by the call to {@link #start()}. 171 */ 172 volatile boolean started = false; 173 174 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 175 176 protected HBaseRPCErrorHandler errorHandler = null; 177 178 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 179 180 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 181 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 182 protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan"; 183 protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan"; 184 185 /** 186 * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists 187 * to prevent the rpc service regarding this request as timeout immediately. 188 */ 189 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 190 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 191 192 /** Default value for above params */ 193 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 194 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 195 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 196 197 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 198 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 199 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 200 201 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create(); 202 203 protected final int maxRequestSize; 204 protected volatile int warnResponseTime; 205 protected volatile int warnResponseSize; 206 protected volatile int warnScanResponseTime; 207 protected volatile int warnScanResponseSize; 208 209 protected final int minClientRequestTimeout; 210 211 protected final Server server; 212 protected final List<BlockingServiceAndInterface> services; 213 214 protected final RpcScheduler scheduler; 215 216 protected final UserProvider userProvider; 217 218 protected final ByteBuffAllocator bbAllocator; 219 220 protected volatile boolean allowFallbackToSimpleAuth; 221 222 /** 223 * Used to get details for scan with a scanner_id<br/> 224 * TODO try to figure out a better way and remove reference from regionserver package later. 225 */ 226 private RSRpcServices rsRpcServices; 227 228 /** 229 * Use to add online slowlog responses 230 */ 231 private NamedQueueRecorder namedQueueRecorder; 232 233 @FunctionalInterface 234 protected interface CallCleanup { 235 void run(); 236 } 237 238 /** 239 * Datastructure for passing a {@link BlockingService} and its associated class of protobuf 240 * service interface. For example, a server that fielded what is defined in the client protobuf 241 * service would pass in an implementation of the client blocking service and then its 242 * ClientService.BlockingInterface.class. Used checking connection setup. 243 */ 244 public static class BlockingServiceAndInterface { 245 private final BlockingService service; 246 private final Class<?> serviceInterface; 247 248 public BlockingServiceAndInterface(final BlockingService service, 249 final Class<?> serviceInterface) { 250 this.service = service; 251 this.serviceInterface = serviceInterface; 252 } 253 254 public Class<?> getServiceInterface() { 255 return this.serviceInterface; 256 } 257 258 public BlockingService getBlockingService() { 259 return this.service; 260 } 261 } 262 263 /** 264 * Constructs a server listening on the named port and address. 265 * @param server hosting instance of {@link Server}. We will do authentications if an 266 * instance else pass null for no authentication check. 267 * @param name Used keying this rpc servers' metrics and for naming the Listener 268 * thread. 269 * @param services A list of services. 270 * @param bindAddress Where to listen 271 * @param reservoirEnabled Enable ByteBufferPool or not. 272 */ 273 public RpcServer(final Server server, final String name, 274 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 275 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 276 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); 277 this.server = server; 278 this.services = services; 279 this.bindAddress = bindAddress; 280 this.conf = conf; 281 // See declaration above for documentation on what this size is. 282 this.maxQueueSizeInBytes = 283 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 284 285 this.warnResponseTime = getWarnResponseTime(conf); 286 this.warnResponseSize = getWarnResponseSize(conf); 287 this.warnScanResponseTime = getWarnScanResponseTime(conf); 288 this.warnScanResponseSize = getWarnScanResponseSize(conf); 289 this.minClientRequestTimeout = 290 conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 291 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 292 293 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 294 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 295 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 296 297 this.cellBlockBuilder = new CellBlockBuilder(conf); 298 299 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 300 this.userProvider = UserProvider.instantiate(conf); 301 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 302 if (isSecurityEnabled) { 303 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 304 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 305 serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(), 306 "can not get current user name when security is enabled"); 307 } else { 308 saslProps = Collections.emptyMap(); 309 serverPrincipal = HConstants.EMPTY_STRING; 310 } 311 312 this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf); 313 this.scheduler = scheduler; 314 } 315 316 @Override 317 public void onConfigurationChange(Configuration newConf) { 318 initReconfigurable(newConf); 319 if (scheduler instanceof ConfigurationObserver) { 320 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 321 } 322 if (authorize) { 323 refreshAuthManager(newConf, new HBasePolicyProvider()); 324 } 325 refreshSlowLogConfiguration(newConf); 326 } 327 328 private void refreshSlowLogConfiguration(Configuration newConf) { 329 boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf); 330 if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) { 331 isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled; 332 } 333 int newWarnResponseTime = getWarnResponseTime(newConf); 334 if (warnResponseTime != newWarnResponseTime) { 335 warnResponseTime = newWarnResponseTime; 336 } 337 int newWarnResponseSize = getWarnResponseSize(newConf); 338 if (warnResponseSize != newWarnResponseSize) { 339 warnResponseSize = newWarnResponseSize; 340 } 341 int newWarnResponseTimeScan = getWarnScanResponseTime(newConf); 342 if (warnScanResponseTime != newWarnResponseTimeScan) { 343 warnScanResponseTime = newWarnResponseTimeScan; 344 } 345 int newWarnScanResponseSize = getWarnScanResponseSize(newConf); 346 if (warnScanResponseSize != newWarnScanResponseSize) { 347 warnScanResponseSize = newWarnScanResponseSize; 348 } 349 } 350 351 private static boolean getIsOnlineLogProviderEnabled(Configuration conf) { 352 return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 353 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 354 } 355 356 private static int getWarnResponseTime(Configuration conf) { 357 return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 358 } 359 360 private static int getWarnResponseSize(Configuration conf) { 361 return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 362 } 363 364 private static int getWarnScanResponseTime(Configuration conf) { 365 return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf)); 366 } 367 368 private static int getWarnScanResponseSize(Configuration conf) { 369 return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf)); 370 } 371 372 protected void initReconfigurable(Configuration confToLoad) { 373 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 374 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 375 LOG.warn("********* WARNING! *********"); 376 LOG.warn("This server is configured to allow connections from INSECURE clients"); 377 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 378 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 379 LOG.warn("impersonation is possible!"); 380 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 381 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 382 LOG.warn("****************************"); 383 } 384 } 385 386 Configuration getConf() { 387 return conf; 388 } 389 390 @Override 391 public boolean isStarted() { 392 return this.started; 393 } 394 395 @Override 396 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) { 397 // Ignore warnings that this should be accessed in a static way instead of via an instance; 398 // it'll break if you go via static route. 399 System.setProperty("hadoop.policy.file", "hbase-policy.xml"); 400 this.authManager.refresh(conf, pp); 401 LOG.info("Refreshed hbase-policy.xml successfully"); 402 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 403 LOG.info("Refreshed super and proxy users successfully"); 404 } 405 406 protected AuthenticationTokenSecretManager createSecretManager() { 407 if (!isSecurityEnabled) return null; 408 if (server == null) return null; 409 Configuration conf = server.getConfiguration(); 410 long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000); 411 long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000); 412 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 413 server.getServerName().toString(), keyUpdateInterval, maxAge); 414 } 415 416 public SecretManager<? extends TokenIdentifier> getSecretManager() { 417 return this.secretManager; 418 } 419 420 @SuppressWarnings("unchecked") 421 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 422 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 423 } 424 425 /** 426 * This is a server side method, which is invoked over RPC. On success the return response has 427 * protobuf response payload. On failure, the exception name and the stack trace are returned in 428 * the protobuf response. 429 */ 430 @Override 431 public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status) 432 throws IOException { 433 try { 434 MethodDescriptor md = call.getMethod(); 435 Message param = call.getParam(); 436 status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime()); 437 // TODO: Review after we add in encoded data blocks. 438 status.setRPCPacket(param); 439 status.resume("Servicing call"); 440 // get an instance of the method arg type 441 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 442 controller.setCallTimeout(call.getTimeout()); 443 Message result = call.getService().callBlockingMethod(md, controller, param); 444 long receiveTime = call.getReceiveTime(); 445 long startTime = call.getStartTime(); 446 long endTime = EnvironmentEdgeManager.currentTime(); 447 int processingTime = (int) (endTime - startTime); 448 int qTime = (int) (startTime - receiveTime); 449 int totalTime = (int) (endTime - receiveTime); 450 if (LOG.isTraceEnabled()) { 451 LOG.trace( 452 "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, " 453 + "totalTime: {}, fsReadTime: {}", 454 CurCall.get().toString(), TextFormat.shortDebugString(result), 455 CurCall.get().getReceiveTime(), qTime, processingTime, totalTime, 456 CurCall.get().getFsReadTime()); 457 } 458 // Use the raw request call size for now. 459 long requestSize = call.getSize(); 460 long responseSize = result.getSerializedSize(); 461 long responseBlockSize = call.getBlockBytesScanned(); 462 long fsReadTime = call.getFsReadTime(); 463 if (call.isClientCellBlockSupported()) { 464 // Include the payload size in HBaseRpcController 465 responseSize += call.getResponseCellSize(); 466 } 467 468 metrics.dequeuedCall(qTime); 469 metrics.processedCall(processingTime); 470 metrics.totalCall(totalTime); 471 metrics.receivedRequest(requestSize); 472 metrics.sentResponse(responseSize); 473 // log any RPC responses that are slower than the configured warn 474 // response time or larger than configured warning size 475 boolean tooSlow = isTooSlow(call, processingTime); 476 boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize); 477 if (tooSlow || tooLarge) { 478 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); 479 // when tagging, we let TooLarge trump TooSmall to keep output simple 480 // note that large responses will often also be slow. 481 logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 482 tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, 483 responseBlockSize, fsReadTime, userName); 484 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { 485 // send logs to ring buffer owned by slowLogRecorder 486 final String className = 487 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 488 this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(), 489 responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge)); 490 } 491 } 492 return new Pair<>(result, controller.cellScanner()); 493 } catch (Throwable e) { 494 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 495 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 496 // need to pass it over the wire. 497 if (e instanceof ServiceException) { 498 if (e.getCause() == null) { 499 LOG.debug("Caught a ServiceException with null cause", e); 500 } else { 501 e = e.getCause(); 502 } 503 } 504 505 // increment the number of requests that were exceptions. 506 metrics.exception(e); 507 508 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 509 if (e instanceof IOException) throw (IOException) e; 510 LOG.error("Unexpected throwable object ", e); 511 throw new IOException(e.getMessage(), e); 512 } 513 } 514 515 /** 516 * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations. 517 * @param param The parameters received in the call. 518 * @param methodName The name of the method invoked 519 * @param call The string representation of the call 520 * @param tooLarge To indicate if the event is tooLarge 521 * @param tooSlow To indicate if the event is tooSlow 522 * @param clientAddress The address of the client who made this call. 523 * @param startTime The time that the call was initiated, in ms. 524 * @param processingTime The duration that the call took to run, in ms. 525 * @param qTime The duration that the call spent on the queue prior to being 526 * initiated, in ms. 527 * @param responseSize The size in bytes of the response buffer. 528 * @param blockBytesScanned The size of block bytes scanned to retrieve the response. 529 * @param userName UserName of the current RPC Call 530 */ 531 void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow, 532 String clientAddress, long startTime, int processingTime, int qTime, long responseSize, 533 long blockBytesScanned, long fsReadTime, String userName) { 534 final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 535 // base information that is reported regardless of type of call 536 Map<String, Object> responseInfo = new HashMap<>(); 537 responseInfo.put("starttimems", startTime); 538 responseInfo.put("processingtimems", processingTime); 539 responseInfo.put("queuetimems", qTime); 540 responseInfo.put("responsesize", responseSize); 541 responseInfo.put("blockbytesscanned", blockBytesScanned); 542 responseInfo.put("fsreadtime", fsReadTime); 543 responseInfo.put("client", clientAddress); 544 responseInfo.put("class", className); 545 responseInfo.put("method", methodName); 546 responseInfo.put("call", call); 547 // The params could be really big, make sure they don't kill us at WARN 548 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 549 if (stringifiedParam.length() > 150) { 550 // Truncate to 1000 chars if TRACE is on, else to 150 chars 551 stringifiedParam = truncateTraceLog(stringifiedParam); 552 } 553 responseInfo.put("param", stringifiedParam); 554 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 555 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 556 String scanDetails; 557 if (request.hasScannerId()) { 558 long scannerId = request.getScannerId(); 559 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 560 } else { 561 scanDetails = rsRpcServices.getScanDetailsWithRequest(request); 562 } 563 if (scanDetails != null) { 564 responseInfo.put("scandetails", scanDetails); 565 } 566 } 567 if (param instanceof ClientProtos.MultiRequest) { 568 int numGets = 0; 569 int numMutations = 0; 570 int numServiceCalls = 0; 571 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 572 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 573 for (ClientProtos.Action action : regionAction.getActionList()) { 574 if (action.hasMutation()) { 575 numMutations++; 576 } 577 if (action.hasGet()) { 578 numGets++; 579 } 580 if (action.hasServiceCall()) { 581 numServiceCalls++; 582 } 583 } 584 } 585 responseInfo.put(MULTI_GETS, numGets); 586 responseInfo.put(MULTI_MUTATIONS, numMutations); 587 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); 588 } 589 final String tag = 590 (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge"); 591 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 592 } 593 594 private boolean isTooSlow(RpcCall call, int processingTime) { 595 long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest 596 ? warnScanResponseTime 597 : this.warnResponseTime; 598 return (processingTime > warnResponseTime && warnResponseTime > -1); 599 } 600 601 private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) { 602 long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest 603 ? warnScanResponseSize 604 : this.warnResponseSize; 605 return (warnResponseSize > -1 606 && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize)); 607 } 608 609 /** 610 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else 611 * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 612 * @param strParam stringifiedParam to be truncated 613 * @return truncated trace log string 614 */ 615 String truncateTraceLog(String strParam) { 616 if (LOG.isTraceEnabled()) { 617 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 618 int truncatedLength = 619 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 620 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 621 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 622 } 623 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 624 } 625 626 /** 627 * Set the handler for calling out of RPC for error conditions. 628 * @param handler the handler implementation 629 */ 630 @Override 631 public void setErrorHandler(HBaseRPCErrorHandler handler) { 632 this.errorHandler = handler; 633 } 634 635 @Override 636 public HBaseRPCErrorHandler getErrorHandler() { 637 return this.errorHandler; 638 } 639 640 /** 641 * Returns the metrics instance for reporting RPC call statistics 642 */ 643 @Override 644 public MetricsHBaseServer getMetrics() { 645 return metrics; 646 } 647 648 @Override 649 public void addCallSize(final long diff) { 650 this.callQueueSizeInBytes.add(diff); 651 } 652 653 /** 654 * Authorize the incoming client connection. 655 * @param user client user 656 * @param connection incoming connection 657 * @param addr InetAddress of incoming connection 658 * @throws AuthorizationException when the client isn't authorized to talk the protocol 659 */ 660 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, 661 InetAddress addr) throws AuthorizationException { 662 if (authorize) { 663 Class<?> c = getServiceInterface(services, connection.getServiceName()); 664 authManager.authorize(user, c, getConf(), addr); 665 } 666 } 667 668 /** 669 * When the read or write buffer size is larger than this limit, i/o will be done in chunks of 670 * this size. Most RPC requests and responses would be be smaller. 671 */ 672 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 673 674 /** 675 * This is a wrapper around 676 * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data 677 * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many 678 * direct buffers as the size of ByteBuffer increases. There should not be any performance 679 * degredation. 680 * @param channel writable byte channel to write on 681 * @param buffer buffer to write 682 * @return number of bytes written 683 * @throws java.io.IOException e 684 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 685 */ 686 protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { 687 688 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) 689 ? channel.read(buffer) 690 : channelIO(channel, null, buffer); 691 if (count > 0) { 692 metrics.receivedBytes(count); 693 } 694 return count; 695 } 696 697 /** 698 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 699 * Only one of readCh or writeCh should be non-null. 700 * @param readCh read channel 701 * @param writeCh write channel 702 * @param buf buffer to read or write into/out of 703 * @return bytes written 704 * @throws java.io.IOException e 705 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 706 */ 707 private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, 708 ByteBuffer buf) throws IOException { 709 710 int originalLimit = buf.limit(); 711 int initialRemaining = buf.remaining(); 712 int ret = 0; 713 714 while (buf.remaining() > 0) { 715 try { 716 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 717 buf.limit(buf.position() + ioSize); 718 719 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 720 721 if (ret < ioSize) { 722 break; 723 } 724 725 } finally { 726 buf.limit(originalLimit); 727 } 728 } 729 730 int nBytes = initialRemaining - buf.remaining(); 731 return (nBytes > 0) ? nBytes : ret; 732 } 733 734 /** 735 * Needed for features such as delayed calls. We need to be able to store the current call so that 736 * we can complete it later or ask questions of what is supported by the current ongoing call. 737 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 738 */ 739 public static Optional<RpcCall> getCurrentCall() { 740 return Optional.ofNullable(CurCall.get()); 741 } 742 743 public static boolean isInRpcCallContext() { 744 return CurCall.get() != null; 745 } 746 747 /** 748 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For 749 * master's rpc call, it may generate new procedure and mutate the region which store procedure. 750 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc 751 * call to avoid the rpc check. 752 * @return the currently ongoing rpc call 753 */ 754 public static Optional<RpcCall> unsetCurrentCall() { 755 Optional<RpcCall> rpcCall = getCurrentCall(); 756 CurCall.set(null); 757 return rpcCall; 758 } 759 760 /** 761 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the 762 * rpc call back after mutate region. 763 */ 764 public static void setCurrentCall(RpcCall rpcCall) { 765 CurCall.set(rpcCall); 766 } 767 768 /** 769 * Returns the user credentials associated with the current RPC request or not present if no 770 * credentials were provided. 771 * @return A User 772 */ 773 public static Optional<User> getRequestUser() { 774 Optional<RpcCall> ctx = getCurrentCall(); 775 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 776 } 777 778 /** 779 * The number of open RPC conections 780 * @return the number of open rpc connections 781 */ 782 abstract public int getNumOpenConnections(); 783 784 /** 785 * Returns the username for any user associated with the current RPC request or not present if no 786 * user is set. 787 */ 788 public static Optional<String> getRequestUserName() { 789 return getRequestUser().map(User::getShortName); 790 } 791 792 /** 793 * Returns the address of the remote client associated with the current RPC request or not present 794 * if no address is set. 795 */ 796 public static Optional<InetAddress> getRemoteAddress() { 797 return getCurrentCall().map(RpcCall::getRemoteAddress); 798 } 799 800 /** 801 * @param serviceName Some arbitrary string that represents a 'service'. 802 * @param services Available service instances 803 * @return Matching BlockingServiceAndInterface pair 804 */ 805 protected static BlockingServiceAndInterface getServiceAndInterface( 806 final List<BlockingServiceAndInterface> services, final String serviceName) { 807 for (BlockingServiceAndInterface bs : services) { 808 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 809 return bs; 810 } 811 } 812 return null; 813 } 814 815 /** 816 * @param serviceName Some arbitrary string that represents a 'service'. 817 * @param services Available services and their service interfaces. 818 * @return Service interface class for <code>serviceName</code> 819 */ 820 protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services, 821 final String serviceName) { 822 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 823 return bsasi == null ? null : bsasi.getServiceInterface(); 824 } 825 826 /** 827 * @param serviceName Some arbitrary string that represents a 'service'. 828 * @param services Available services and their service interfaces. 829 * @return BlockingService that goes with the passed <code>serviceName</code> 830 */ 831 protected static BlockingService getService(final List<BlockingServiceAndInterface> services, 832 final String serviceName) { 833 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 834 return bsasi == null ? null : bsasi.getBlockingService(); 835 } 836 837 protected static MonitoredRPCHandler getStatus() { 838 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 839 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 840 if (status != null) { 841 return status; 842 } 843 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 844 status.pause("Waiting for a call"); 845 RpcServer.MONITORED_RPC.set(status); 846 return status; 847 } 848 849 /** 850 * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error. 851 */ 852 public static InetAddress getRemoteIp() { 853 RpcCall call = CurCall.get(); 854 if (call != null) { 855 return call.getRemoteAddress(); 856 } 857 return null; 858 } 859 860 @Override 861 public RpcScheduler getScheduler() { 862 return scheduler; 863 } 864 865 @Override 866 public ByteBuffAllocator getByteBuffAllocator() { 867 return this.bbAllocator; 868 } 869 870 @Override 871 public void setRsRpcServices(RSRpcServices rsRpcServices) { 872 this.rsRpcServices = rsRpcServices; 873 } 874 875 @Override 876 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { 877 this.namedQueueRecorder = namedQueueRecorder; 878 } 879 880 protected boolean needAuthorization() { 881 return authorize; 882 } 883 884 @RestrictedApi(explanation = "Should only be called in tests", link = "", 885 allowedOnPath = ".*/src/test/.*") 886 public List<BlockingServiceAndInterface> getServices() { 887 return services; 888 } 889}