001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED; 022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT; 023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE; 024import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.net.InetSocketAddress; 029import java.net.SocketAddress; 030import java.security.cert.Certificate; 031import java.security.cert.X509Certificate; 032import java.util.List; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.atomic.AtomicReference; 035import javax.net.ssl.SSLPeerUnverifiedException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.HBaseInterfaceAudience; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.exceptions.X509Exception; 041import org.apache.hadoop.hbase.io.FileChangeWatcher; 042import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 043import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.security.HBasePolicyProvider; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 048import org.apache.hadoop.hbase.util.NettyUnsafeUtils; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.hadoop.hbase.util.ReflectionUtils; 051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 058import org.apache.hbase.thirdparty.com.google.protobuf.Message; 059import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 060import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 061import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 062import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; 063import org.apache.hbase.thirdparty.io.netty.channel.Channel; 064import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 065import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 066import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 068import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; 069import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; 070import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 071import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 072import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler; 073import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; 074import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 075import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 076 077/** 078 * An RPC server with Netty4 implementation. 079 * @since 2.0.0 080 */ 081@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 082public class NettyRpcServer extends RpcServer { 083 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); 084 085 /** 086 * Name of property to change the byte buf allocator for the netty channels. Default is no value, 087 * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", 088 * and "heap", or, the name of a class implementing ByteBufAllocator. 089 * <p> 090 * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is 091 * controlled by platform specific code and documented system properties. 092 * <p> 093 * "heap" will prefer heap arena allocations. 094 */ 095 public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; 096 static final String POOLED_ALLOCATOR_TYPE = "pooled"; 097 static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; 098 static final String HEAP_ALLOCATOR_TYPE = "heap"; 099 100 /** 101 * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was 102 * exceeded, channel will have setAutoRead to true again. The server will start reading incoming 103 * bytes (requests) from the client channel. 104 */ 105 public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY = 106 "hbase.server.netty.writable.watermark.low"; 107 private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0; 108 109 /** 110 * High watermark for pending outbound bytes of a single netty channel. If the number of pending 111 * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server 112 * will stop reading incoming requests from the client channel. 113 * <p> 114 * Note: any requests already in the call queue will still be processed. 115 */ 116 public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY = 117 "hbase.server.netty.writable.watermark.high"; 118 private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0; 119 120 /** 121 * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending 122 * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory 123 * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight 124 * requests. 125 * <p> 126 * Note: must be higher than the high watermark, otherwise it's ignored. 127 */ 128 public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY = 129 "hbase.server.netty.writable.watermark.fatal"; 130 private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0; 131 132 private final InetSocketAddress bindAddress; 133 134 private final CountDownLatch closed = new CountDownLatch(1); 135 private final Channel serverChannel; 136 final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); 137 private final ByteBufAllocator channelAllocator; 138 private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>(); 139 private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>(); 140 private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>(); 141 142 private volatile int writeBufferFatalThreshold; 143 private volatile WriteBufferWaterMark writeBufferWaterMark; 144 145 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, 146 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, 147 boolean reservoirEnabled) throws IOException { 148 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 149 this.bindAddress = bindAddress; 150 this.channelAllocator = getChannelAllocator(conf); 151 // Get the event loop group configuration from the server class if available. 152 NettyEventLoopGroupConfig config = null; 153 if (server instanceof HRegionServer) { 154 config = ((HRegionServer) server).getEventLoopGroupConfig(); 155 } 156 if (config == null) { 157 config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer"); 158 } 159 160 // call before creating bootstrap below so that the necessary configs can be set 161 configureNettyWatermarks(conf); 162 163 EventLoopGroup eventLoopGroup = config.group(); 164 Class<? extends ServerChannel> channelClass = config.serverChannelClass(); 165 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) 166 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) 167 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) 168 .childOption(ChannelOption.SO_REUSEADDR, true) 169 .childHandler(new ChannelInitializer<Channel>() { 170 @Override 171 protected void initChannel(Channel ch) throws Exception { 172 ch.config().setWriteBufferWaterMark(writeBufferWaterMark); 173 ch.config().setAllocator(channelAllocator); 174 ChannelPipeline pipeline = ch.pipeline(); 175 176 NettyServerRpcConnection conn = createNettyServerRpcConnection(ch); 177 178 if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) { 179 initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true)); 180 } 181 pipeline 182 .addLast(NettyRpcServerPreambleHandler.DECODER_NAME, 183 NettyRpcServerPreambleHandler.createDecoder()) 184 .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn)) 185 // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may 186 // send RpcResponse to client. 187 .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics)) 188 // Add writability handler after the response encoder, so we can abort writes before 189 // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so 190 // that the handler configs can be live updated via update_config. 191 .addLast(NettyRpcServerChannelWritabilityHandler.NAME, 192 new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold, 193 () -> isWritabilityBackpressureEnabled())); 194 } 195 }); 196 try { 197 serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); 198 LOG.info("Bind to {}", serverChannel.localAddress()); 199 } catch (InterruptedException e) { 200 throw new InterruptedIOException(e.getMessage()); 201 } 202 initReconfigurable(conf); 203 this.scheduler.init(new RpcSchedulerContext(this)); 204 } 205 206 @Override 207 public void onConfigurationChange(Configuration newConf) { 208 super.onConfigurationChange(newConf); 209 configureNettyWatermarks(newConf); 210 } 211 212 private void configureNettyWatermarks(Configuration conf) { 213 int watermarkLow = 214 conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT); 215 int watermarkHigh = 216 conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT); 217 int fatalThreshold = 218 conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT); 219 220 WriteBufferWaterMark oldWaterMark = writeBufferWaterMark; 221 int oldFatalThreshold = writeBufferFatalThreshold; 222 223 boolean disabled = false; 224 if (watermarkHigh == 0 && watermarkLow == 0) { 225 // if both are 0, use the netty default, which we will treat as "disabled". 226 // when disabled, we won't manage autoRead in response to writability changes. 227 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 228 disabled = true; 229 } else { 230 // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to 231 // 1 to avoid confusing behavior. 232 if (watermarkLow == 0) { 233 LOG.warn( 234 "Detected a {} value of 0, which is impossible to achieve " 235 + "due to how netty evaluates these thresholds, setting to 1", 236 CHANNEL_WRITABLE_LOW_WATERMARK_KEY); 237 watermarkLow = 1; 238 } 239 240 // netty validates the watermarks and throws an exception if high < low, fail more gracefully 241 // by disabling the watermarks and warning. 242 if (watermarkHigh <= watermarkLow) { 243 LOG.warn( 244 "Detected {} value {}, lower than {} value {}. This will fail netty validation, " 245 + "so disabling", 246 CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 247 watermarkLow); 248 writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; 249 } else { 250 writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh); 251 } 252 253 // only apply this check when watermark is enabled. this way we give the operator some 254 // flexibility if they want to try enabling fatal threshold without backpressure. 255 if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) { 256 LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.", 257 CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 258 watermarkHigh); 259 fatalThreshold = 0; 260 } 261 } 262 263 writeBufferFatalThreshold = fatalThreshold; 264 265 if ( 266 oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low() 267 || oldWaterMark.high() != writeBufferWaterMark.high() 268 || oldFatalThreshold != writeBufferFatalThreshold) 269 ) { 270 LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}", 271 disabled ? "disabled" : writeBufferWaterMark.low(), 272 disabled ? "disabled" : writeBufferWaterMark.high(), 273 writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold); 274 } 275 276 // update any existing channels 277 for (Channel channel : allChannels) { 278 channel.config().setWriteBufferWaterMark(writeBufferWaterMark); 279 // if disabling watermark, set auto read to true in case channel had been exceeding 280 // previous watermark 281 if (disabled) { 282 channel.config().setAutoRead(true); 283 } 284 } 285 } 286 287 public boolean isWritabilityBackpressureEnabled() { 288 return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT; 289 } 290 291 private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { 292 final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); 293 if (value != null) { 294 if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 295 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 296 return PooledByteBufAllocator.DEFAULT; 297 } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 298 LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); 299 return UnpooledByteBufAllocator.DEFAULT; 300 } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) { 301 LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); 302 return HeapByteBufAllocator.DEFAULT; 303 } else { 304 // If the value is none of the recognized labels, treat it as a class name. This allows the 305 // user to supply a custom implementation, perhaps for debugging. 306 try { 307 // ReflectionUtils throws UnsupportedOperationException if there are any problems. 308 ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value); 309 LOG.info("Using {} for buffer allocation", value); 310 return alloc; 311 } catch (ClassCastException | UnsupportedOperationException e) { 312 throw new IOException(e); 313 } 314 } 315 } else { 316 LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); 317 return PooledByteBufAllocator.DEFAULT; 318 } 319 } 320 321 // will be overridden in tests 322 @InterfaceAudience.Private 323 protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { 324 return new NettyServerRpcConnection(NettyRpcServer.this, channel); 325 } 326 327 @Override 328 public synchronized void start() { 329 if (started) { 330 return; 331 } 332 authTokenSecretMgr = createSecretManager(); 333 if (authTokenSecretMgr != null) { 334 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 335 // LeaderElector start. See HBASE-25875 336 synchronized (authTokenSecretMgr) { 337 setSecretManager(authTokenSecretMgr); 338 authTokenSecretMgr.start(); 339 } 340 } 341 this.authManager = new ServiceAuthorizationManager(); 342 HBasePolicyProvider.init(conf, authManager); 343 scheduler.start(); 344 started = true; 345 } 346 347 @Override 348 public synchronized void stop() { 349 if (!running) { 350 return; 351 } 352 LOG.info("Stopping server on " + this.serverChannel.localAddress()); 353 FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); 354 if (ks != null) { 355 ks.stop(); 356 } 357 FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); 358 if (ts != null) { 359 ts.stop(); 360 } 361 if (authTokenSecretMgr != null) { 362 authTokenSecretMgr.stop(); 363 authTokenSecretMgr = null; 364 } 365 allChannels.close().awaitUninterruptibly(); 366 serverChannel.close(); 367 scheduler.stop(); 368 closed.countDown(); 369 running = false; 370 } 371 372 @Override 373 public synchronized void join() throws InterruptedException { 374 closed.await(); 375 } 376 377 @Override 378 public synchronized InetSocketAddress getListenerAddress() { 379 return ((InetSocketAddress) serverChannel.localAddress()); 380 } 381 382 @Override 383 public void setSocketSendBufSize(int size) { 384 } 385 386 @Override 387 public int getNumOpenConnections() { 388 return allChannels.size(); 389 } 390 391 @Override 392 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 393 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) 394 throws IOException { 395 return call(service, md, param, cellScanner, receiveTime, status, 396 EnvironmentEdgeManager.currentTime(), 0); 397 } 398 399 @Override 400 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 401 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 402 long startTime, int timeout) throws IOException { 403 NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, 404 -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null); 405 return call(fakeCall, status); 406 } 407 408 private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext) 409 throws X509Exception, IOException { 410 SslContext nettySslContext = getSslContext(); 411 412 if (supportPlaintext) { 413 p.addLast("ssl", new OptionalSslHandler(nettySslContext)); 414 LOG.debug("Dual mode SSL handler added for channel: {}", p.channel()); 415 } else { 416 SocketAddress remoteAddress = p.channel().remoteAddress(); 417 SslHandler sslHandler; 418 419 if (remoteAddress instanceof InetSocketAddress) { 420 InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress; 421 String host; 422 423 if (conf.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true)) { 424 host = remoteInetAddress.getHostName(); 425 } else { 426 host = remoteInetAddress.getHostString(); 427 } 428 429 int port = remoteInetAddress.getPort(); 430 431 /* 432 * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the 433 * engine properly by passing the remote address 434 */ 435 sslHandler = nettySslContext.newHandler(p.channel().alloc(), host, port); 436 } else { 437 sslHandler = nettySslContext.newHandler(p.channel().alloc()); 438 } 439 440 sslHandler.setWrapDataSize( 441 conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE)); 442 443 sslHandler.handshakeFuture() 444 .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress)); 445 446 p.addLast("ssl", sslHandler); 447 LOG.debug("SSL handler added for channel: {}", p.channel()); 448 } 449 } 450 451 static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler, 452 SocketAddress remoteAddress) { 453 try { 454 Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates(); 455 if (certificates != null && certificates.length > 0) { 456 X509Certificate[] x509Certificates = new X509Certificate[certificates.length]; 457 for (int i = 0; i < x509Certificates.length; i++) { 458 x509Certificates[i] = (X509Certificate) certificates[i]; 459 } 460 conn.clientCertificateChain = x509Certificates; 461 } else if (sslHandler.engine().getNeedClientAuth()) { 462 LOG.debug( 463 "Could not get peer certificate on TLS connection from {}, although one is required", 464 remoteAddress); 465 } 466 } catch (SSLPeerUnverifiedException e) { 467 if (sslHandler.engine().getNeedClientAuth()) { 468 LOG.debug( 469 "Could not get peer certificate on TLS connection from {}, although one is required", 470 remoteAddress, e); 471 } 472 } catch (Exception e) { 473 LOG.debug("Unexpected error getting peer certificate for TLS connection from {}", 474 remoteAddress, e); 475 } 476 } 477 478 SslContext getSslContext() throws X509Exception, IOException { 479 SslContext result = sslContextForServer.get(); 480 if (result == null) { 481 result = X509Util.createSslContextForServer(conf); 482 if (!sslContextForServer.compareAndSet(null, result)) { 483 // lost the race, another thread already set the value 484 result = sslContextForServer.get(); 485 } else if ( 486 keyStoreWatcher.get() == null && trustStoreWatcher.get() == null 487 && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) 488 ) { 489 X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, 490 () -> sslContextForServer.set(null)); 491 } 492 } 493 return result; 494 } 495 496 public int getWriteBufferFatalThreshold() { 497 return writeBufferFatalThreshold; 498 } 499 500 public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() { 501 long total = 0; 502 long max = 0; 503 for (Channel channel : allChannels) { 504 long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel); 505 total += outbound; 506 max = Math.max(max, outbound); 507 } 508 return Pair.newPair(total, max); 509 } 510}