001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT; 021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY; 022import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY; 023import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 024import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT; 025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY; 026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 027import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR; 028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE; 029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT; 030import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT; 031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY; 032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 033import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY; 034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT; 035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY; 036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT; 037import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 038import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION; 039import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT; 040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY; 041import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION; 042import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION; 043import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION; 044import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY; 045import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 046import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION; 047import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION; 048import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY; 049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY; 050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS; 051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD; 052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT; 053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS; 054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT; 055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT; 056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT; 057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY; 058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY; 059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY; 060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM; 061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT; 062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY; 063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY; 064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY; 065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY; 066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY; 067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY; 068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY; 069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY; 070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY; 071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY; 072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY; 073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_DEFAULT; 074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_KEY; 075import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY; 076import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY; 077 078import java.io.IOException; 079import java.net.InetAddress; 080import java.net.InetSocketAddress; 081import java.net.UnknownHostException; 082import java.security.PrivilegedAction; 083import java.util.List; 084import java.util.Map; 085import java.util.concurrent.BlockingQueue; 086import java.util.concurrent.ExecutorService; 087import java.util.concurrent.LinkedBlockingQueue; 088import java.util.concurrent.ThreadPoolExecutor; 089import java.util.concurrent.TimeUnit; 090import javax.security.auth.callback.Callback; 091import javax.security.auth.callback.UnsupportedCallbackException; 092import javax.security.sasl.AuthorizeCallback; 093import javax.security.sasl.SaslServer; 094import org.apache.commons.lang3.ArrayUtils; 095import org.apache.hadoop.conf.Configuration; 096import org.apache.hadoop.conf.Configured; 097import org.apache.hadoop.hbase.HBaseConfiguration; 098import org.apache.hadoop.hbase.HBaseInterfaceAudience; 099import org.apache.hadoop.hbase.filter.ParseFilter; 100import org.apache.hadoop.hbase.http.HttpServerUtil; 101import org.apache.hadoop.hbase.http.InfoServer; 102import org.apache.hadoop.hbase.log.HBaseMarkers; 103import org.apache.hadoop.hbase.security.SaslUtil; 104import org.apache.hadoop.hbase.security.SecurityUtil; 105import org.apache.hadoop.hbase.security.UserProvider; 106import org.apache.hadoop.hbase.thrift.generated.Hbase; 107import org.apache.hadoop.hbase.util.DNS; 108import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 109import org.apache.hadoop.hbase.util.JvmPauseMonitor; 110import org.apache.hadoop.hbase.util.Strings; 111import org.apache.hadoop.hbase.util.VersionInfo; 112import org.apache.hadoop.security.SaslRpcServer; 113import org.apache.hadoop.security.UserGroupInformation; 114import org.apache.hadoop.security.authorize.ProxyUsers; 115import org.apache.hadoop.util.Shell.ExitCodeException; 116import org.apache.hadoop.util.Tool; 117import org.apache.hadoop.util.ToolRunner; 118import org.apache.thrift.TProcessor; 119import org.apache.thrift.protocol.TBinaryProtocol; 120import org.apache.thrift.protocol.TCompactProtocol; 121import org.apache.thrift.protocol.TProtocolFactory; 122import org.apache.thrift.server.THsHaServer; 123import org.apache.thrift.server.TNonblockingServer; 124import org.apache.thrift.server.TServer; 125import org.apache.thrift.server.TServlet; 126import org.apache.thrift.server.TThreadedSelectorServer; 127import org.apache.thrift.transport.TNonblockingServerSocket; 128import org.apache.thrift.transport.TNonblockingServerTransport; 129import org.apache.thrift.transport.TSaslServerTransport; 130import org.apache.thrift.transport.TServerSocket; 131import org.apache.thrift.transport.TServerTransport; 132import org.apache.thrift.transport.TTransportFactory; 133import org.apache.thrift.transport.layered.TFramedTransport; 134import org.apache.yetus.audience.InterfaceAudience; 135import org.slf4j.Logger; 136import org.slf4j.LoggerFactory; 137 138import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 139import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 140import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 141import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 142import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 143import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 144import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 145import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 146import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion; 147import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration; 148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory; 149import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer; 150import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; 151import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; 152import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory; 153import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler; 154import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; 155import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory; 156import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool; 157 158/** 159 * ThriftServer- this class starts up a Thrift server which implements the Hbase API specified in 160 * the Hbase.thrift IDL file. The server runs in an independent process. 161 */ 162@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 163public class ThriftServer extends Configured implements Tool { 164 165 private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); 166 167 protected Configuration conf; 168 169 protected InfoServer infoServer; 170 171 protected TProcessor processor; 172 173 protected ThriftMetrics metrics; 174 protected HBaseServiceHandler hbaseServiceHandler; 175 protected UserGroupInformation serviceUGI; 176 protected UserGroupInformation httpUGI; 177 protected boolean httpEnabled; 178 179 protected SaslUtil.QualityOfProtection qop; 180 protected String host; 181 protected int listenPort; 182 183 protected boolean securityEnabled; 184 protected boolean doAsEnabled; 185 186 protected JvmPauseMonitor pauseMonitor; 187 188 protected volatile TServer tserver; 189 protected volatile Server httpServer; 190 191 // 192 // Main program and support routines 193 // 194 195 public ThriftServer(Configuration conf) { 196 this.conf = HBaseConfiguration.create(conf); 197 } 198 199 protected ThriftMetrics createThriftMetrics(Configuration conf) { 200 return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); 201 } 202 203 protected void setupParamters() throws IOException { 204 // login the server principal (if using secure Hadoop) 205 UserProvider userProvider = UserProvider.instantiate(conf); 206 securityEnabled = 207 userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled(); 208 if (securityEnabled) { 209 host = Strings.domainNamePointerToHostName( 210 DNS.getDefaultHost(conf.get(THRIFT_DNS_INTERFACE_KEY, "default"), 211 conf.get(THRIFT_DNS_NAMESERVER_KEY, "default"))); 212 userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host); 213 214 // Setup the SPNEGO user for HTTP if configured 215 String spnegoPrincipal = getSpengoPrincipal(conf, host); 216 String spnegoKeytab = getSpnegoKeytab(conf); 217 UserGroupInformation.setConfiguration(conf); 218 // login the SPNEGO principal using UGI to avoid polluting the login user 219 this.httpUGI = 220 UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, spnegoKeytab); 221 } 222 this.serviceUGI = userProvider.getCurrent().getUGI(); 223 if (httpUGI == null) { 224 this.httpUGI = serviceUGI; 225 } 226 227 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); 228 this.metrics = createThriftMetrics(conf); 229 this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); 230 this.hbaseServiceHandler = createHandler(conf, userProvider); 231 this.hbaseServiceHandler.initMetrics(metrics); 232 this.processor = createProcessor(); 233 234 httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false); 235 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false); 236 if (doAsEnabled && !httpEnabled) { 237 LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured"); 238 } 239 240 String strQop = conf.get(THRIFT_QOP_KEY); 241 if (strQop != null) { 242 this.qop = SaslUtil.getQop(strQop); 243 } 244 if (qop != null) { 245 if ( 246 qop != SaslUtil.QualityOfProtection.AUTHENTICATION 247 && qop != SaslUtil.QualityOfProtection.INTEGRITY 248 && qop != SaslUtil.QualityOfProtection.PRIVACY 249 ) { 250 throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.", 251 THRIFT_QOP_KEY, SaslUtil.QualityOfProtection.AUTHENTICATION.name(), 252 SaslUtil.QualityOfProtection.INTEGRITY.name(), 253 SaslUtil.QualityOfProtection.PRIVACY.name())); 254 } 255 checkHttpSecurity(qop, conf); 256 if (!securityEnabled) { 257 throw new IOException("Thrift server must run in secure mode to support authentication"); 258 } 259 } 260 registerFilters(conf); 261 pauseMonitor.start(); 262 } 263 264 private String getSpengoPrincipal(Configuration conf, String host) throws IOException { 265 String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY); 266 if (principal == null) { 267 // We cannot use the Hadoop configuration deprecation handling here since 268 // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos 269 // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY 270 // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend 271 // Kerberos principal and SPNEGO principal. 272 LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.", 273 THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY); 274 principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 275 } 276 // Handle _HOST in principal value 277 return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host); 278 } 279 280 private String getSpnegoKeytab(Configuration conf) { 281 String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY); 282 if (keytab == null) { 283 // We cannot use the Hadoop configuration deprecation handling here since 284 // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos 285 // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY 286 // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend 287 // Kerberos keytab and SPNEGO keytab. 288 LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.", 289 THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY); 290 keytab = conf.get(THRIFT_KEYTAB_FILE_KEY); 291 } 292 return keytab; 293 } 294 295 protected void startInfoServer() throws IOException { 296 // Put up info server. 297 int port = conf.getInt(THRIFT_INFO_SERVER_PORT, THRIFT_INFO_SERVER_PORT_DEFAULT); 298 299 if (port >= 0) { 300 conf.setLong("startcode", EnvironmentEdgeManager.currentTime()); 301 String a = 302 conf.get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT); 303 infoServer = new InfoServer("thrift", a, port, false, conf); 304 infoServer.setAttribute("hbase.conf", conf); 305 infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name()); 306 infoServer.start(); 307 } 308 } 309 310 protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) { 311 if ( 312 qop == SaslUtil.QualityOfProtection.PRIVACY && conf.getBoolean(USE_HTTP_CONF_KEY, false) 313 && !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false) 314 ) { 315 throw new IllegalArgumentException( 316 "Thrift HTTP Server's QoP is privacy, but " + THRIFT_SSL_ENABLED_KEY + " is false"); 317 } 318 } 319 320 protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider) 321 throws IOException { 322 return new ThriftHBaseServiceHandler(conf, userProvider); 323 } 324 325 protected TProcessor createProcessor() { 326 return new Hbase.Processor<>( 327 HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf)); 328 } 329 330 /** 331 * the thrift server, not null means the server is started, for test only 332 * @return the tServer 333 */ 334 @InterfaceAudience.Private 335 public TServer getTserver() { 336 return tserver; 337 } 338 339 /** 340 * the Jetty server, not null means the HTTP server is started, for test only 341 * @return the http server 342 */ 343 @InterfaceAudience.Private 344 public Server getHttpServer() { 345 return httpServer; 346 } 347 348 protected void printUsageAndExit(Options options, int exitCode) throws ExitCodeException { 349 HelpFormatter formatter = new HelpFormatter(); 350 formatter.printHelp("Thrift", null, options, 351 "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + "'hbase thrift'\n" 352 + "To shutdown the thrift server run 'hbase-daemon.sh stop " 353 + "thrift' or send a kill signal to the thrift server pid", 354 true); 355 throw new ExitCodeException(exitCode, ""); 356 } 357 358 /** 359 * Create a Servlet for the http server 360 * @param protocolFactory protocolFactory 361 * @return the servlet 362 */ 363 protected TServlet createTServlet(TProtocolFactory protocolFactory) { 364 return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI, 365 hbaseServiceHandler, securityEnabled, doAsEnabled); 366 } 367 368 /** 369 * Setup an HTTP Server using Jetty to serve calls from THttpClient 370 * @throws IOException IOException 371 */ 372 protected void setupHTTPServer() throws IOException { 373 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 374 TServlet thriftHttpServlet = createTServlet(protocolFactory); 375 376 // Set the default max thread number to 100 to limit 377 // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. 378 // Jetty set the default max thread number to 250, if we don't set it. 379 // 380 // Our default min thread number 2 is the same as that used by Jetty. 381 int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, conf 382 .getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, HTTP_MIN_THREADS_KEY_DEFAULT)); 383 int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, conf 384 .getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, HTTP_MAX_THREADS_KEY_DEFAULT)); 385 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads); 386 threadPool.setMinThreads(minThreads); 387 httpServer = new Server(threadPool); 388 389 // Context handler 390 ServletContextHandler ctxHandler = 391 new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS); 392 ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*"); 393 HttpServerUtil.constrainHttpMethods(ctxHandler, 394 conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT)); 395 396 // set up Jetty and run the embedded server 397 HttpConfiguration httpConfig = new HttpConfiguration(); 398 httpConfig.setSecureScheme("https"); 399 httpConfig.setSecurePort(listenPort); 400 httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 401 httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 402 httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE); 403 httpConfig.setSendServerVersion(false); 404 httpConfig.setSendDateHeader(false); 405 406 ServerConnector serverConnector; 407 if (conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) { 408 HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); 409 httpsConfig.addCustomizer(new SecureRequestCustomizer()); 410 411 SslContextFactory.Server sslCtxFactory = new SslContextFactory.Server(); 412 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY); 413 String password = 414 HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null); 415 String keyPassword = 416 HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password); 417 sslCtxFactory.setKeyStorePath(keystore); 418 sslCtxFactory.setKeyStorePassword(password); 419 sslCtxFactory.setKeyManagerPassword(keyPassword); 420 sslCtxFactory 421 .setKeyStoreType(conf.get(THRIFT_SSL_KEYSTORE_TYPE_KEY, THRIFT_SSL_KEYSTORE_TYPE_DEFAULT)); 422 423 String[] excludeCiphers = 424 conf.getStrings(THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 425 if (excludeCiphers.length != 0) { 426 sslCtxFactory.setExcludeCipherSuites(excludeCiphers); 427 } 428 String[] includeCiphers = 429 conf.getStrings(THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 430 if (includeCiphers.length != 0) { 431 sslCtxFactory.setIncludeCipherSuites(includeCiphers); 432 } 433 434 // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566 435 String[] excludeProtocols = conf.getStrings(THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3"); 436 if (excludeProtocols.length != 0) { 437 sslCtxFactory.setExcludeProtocols(excludeProtocols); 438 } 439 String[] includeProtocols = 440 conf.getStrings(THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY); 441 if (includeProtocols.length != 0) { 442 sslCtxFactory.setIncludeProtocols(includeProtocols); 443 } 444 445 serverConnector = new ServerConnector(httpServer, 446 new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()), 447 new HttpConnectionFactory(httpsConfig)); 448 } else { 449 serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig)); 450 } 451 serverConnector.setPort(listenPort); 452 serverConnector.setHost(getBindAddress(conf).getHostAddress()); 453 httpServer.addConnector(serverConnector); 454 httpServer.setStopAtShutdown(true); 455 456 if (doAsEnabled) { 457 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 458 } 459 LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort)); 460 } 461 462 /** 463 * Setting up the thrift TServer 464 */ 465 protected void setupServer() throws Exception { 466 // Construct correct ProtocolFactory 467 TProtocolFactory protocolFactory = getProtocolFactory(); 468 469 ImplType implType = ImplType.getServerImpl(conf); 470 TProcessor processorToUse = processor; 471 472 // Construct correct TransportFactory 473 TTransportFactory transportFactory; 474 if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) { 475 if (qop != null) { 476 throw new RuntimeException( 477 "Thrift server authentication" + " doesn't work with framed transport yet"); 478 } 479 transportFactory = new TFramedTransport.Factory( 480 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024); 481 LOG.debug("Using framed transport"); 482 } else if (qop == null) { 483 transportFactory = new TTransportFactory(); 484 } else { 485 // Extract the name from the principal 486 String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY); 487 if (thriftKerberosPrincipal == null) { 488 throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null"); 489 } 490 String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal); 491 Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name()); 492 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); 493 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, 494 new SaslRpcServer.SaslGssCallbackHandler() { 495 @Override 496 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 497 AuthorizeCallback ac = null; 498 for (Callback callback : callbacks) { 499 if (callback instanceof AuthorizeCallback) { 500 ac = (AuthorizeCallback) callback; 501 } else { 502 throw new UnsupportedCallbackException(callback, 503 "Unrecognized SASL GSSAPI Callback"); 504 } 505 } 506 if (ac != null) { 507 String authid = ac.getAuthenticationID(); 508 String authzid = ac.getAuthorizationID(); 509 if (!authid.equals(authzid)) { 510 ac.setAuthorized(false); 511 } else { 512 ac.setAuthorized(true); 513 String userName = SecurityUtil.getUserFromPrincipal(authzid); 514 LOG.info("Effective user: {}", userName); 515 ac.setAuthorizedID(userName); 516 } 517 } 518 } 519 }); 520 transportFactory = saslFactory; 521 522 // Create a processor wrapper, to get the caller 523 processorToUse = (inProt, outProt) -> { 524 TSaslServerTransport saslServerTransport = (TSaslServerTransport) inProt.getTransport(); 525 SaslServer saslServer = saslServerTransport.getSaslServer(); 526 String principal = saslServer.getAuthorizationID(); 527 hbaseServiceHandler.setEffectiveUser(principal); 528 processor.process(inProt, outProt); 529 }; 530 } 531 532 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { 533 LOG.error( 534 "Server types {} don't support IP address binding at the moment. See " 535 + "https://issues.apache.org/jira/browse/HBASE-2155 for details.", 536 Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP())); 537 throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType); 538 } 539 540 InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort); 541 if ( 542 implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING 543 || implType == ImplType.THREADED_SELECTOR 544 ) { 545 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); 546 if (implType == ImplType.NONBLOCKING) { 547 tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse, 548 transportFactory, inetSocketAddress); 549 } else if (implType == ImplType.HS_HA) { 550 tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory, 551 inetSocketAddress); 552 } else { // THREADED_SELECTOR 553 tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse, 554 transportFactory, inetSocketAddress); 555 } 556 LOG.info("starting HBase {} server on {}", implType.simpleClassName(), 557 Integer.toString(listenPort)); 558 } else if (implType == ImplType.THREAD_POOL) { 559 this.tserver = 560 getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, inetSocketAddress); 561 } else { 562 throw new AssertionError( 563 "Unsupported Thrift server implementation: " + implType.simpleClassName()); 564 } 565 566 // A sanity check that we instantiated the right type of server. 567 if (tserver.getClass() != implType.serverClass) { 568 throw new AssertionError("Expected to create Thrift server class " 569 + implType.serverClass.getName() + " but got " + tserver.getClass().getName()); 570 } 571 } 572 573 protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, 574 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 575 InetSocketAddress inetSocketAddress) { 576 LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); 577 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); 578 serverArgs.processor(processor); 579 serverArgs.transportFactory(transportFactory); 580 serverArgs.protocolFactory(protocolFactory); 581 return new TNonblockingServer(serverArgs); 582 } 583 584 protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport, 585 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 586 InetSocketAddress inetSocketAddress) { 587 LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); 588 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); 589 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 590 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 591 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 592 int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 593 serverArgs.getMaxWorkerThreads()); 594 ExecutorService executorService = createExecutor(callQueue, workerThread, workerThread); 595 serverArgs.executorService(executorService).processor(processor) 596 .transportFactory(transportFactory).protocolFactory(protocolFactory); 597 return new THsHaServer(serverArgs); 598 } 599 600 protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, 601 TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, 602 InetSocketAddress inetSocketAddress) { 603 LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); 604 TThreadedSelectorServer.Args serverArgs = 605 new HThreadedSelectorServerArgs(serverTransport, conf); 606 int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 607 TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS); 608 CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics); 609 int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 610 serverArgs.getWorkerThreads()); 611 int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads()); 612 serverArgs.selectorThreads(selectorThreads); 613 ExecutorService executorService = createExecutor(callQueue, workerThreads, workerThreads); 614 serverArgs.executorService(executorService).processor(processor) 615 .transportFactory(transportFactory).protocolFactory(protocolFactory); 616 return new TThreadedSelectorServer(serverArgs); 617 } 618 619 protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, 620 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception { 621 LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); 622 // Thrift's implementation uses '0' as a placeholder for 'use the default.' 623 int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT); 624 int readTimeout = 625 conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT); 626 TServerTransport serverTransport = 627 new TServerSocket(new TServerSocket.ServerSocketTransportArgs().bindAddr(inetSocketAddress) 628 .backlog(backlog).clientTimeout(readTimeout)); 629 630 TBoundedThreadPoolServer.Args serverArgs = 631 new TBoundedThreadPoolServer.Args(serverTransport, conf); 632 serverArgs.processor(processor).transportFactory(transportFactory) 633 .protocolFactory(protocolFactory); 634 return new TBoundedThreadPoolServer(serverArgs, metrics); 635 } 636 637 protected TProtocolFactory getProtocolFactory() { 638 TProtocolFactory protocolFactory; 639 640 if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) { 641 LOG.debug("Using compact protocol"); 642 protocolFactory = new TCompactProtocol.Factory(); 643 } else { 644 LOG.debug("Using binary protocol"); 645 protocolFactory = new TBinaryProtocol.Factory(); 646 } 647 648 return protocolFactory; 649 } 650 651 protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, int minWorkers, 652 int maxWorkers) { 653 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 654 tfb.setDaemon(true); 655 tfb.setNameFormat("thrift-worker-%d"); 656 ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers, 657 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); 658 threadPool.allowCoreThreadTimeOut(true); 659 return threadPool; 660 } 661 662 protected InetAddress getBindAddress(Configuration conf) throws UnknownHostException { 663 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); 664 return InetAddress.getByName(bindAddressStr); 665 } 666 667 public static void registerFilters(Configuration conf) { 668 String[] filters = conf.getStrings(THRIFT_FILTERS); 669 Splitter splitter = Splitter.on(':'); 670 if (filters != null) { 671 for (String filterClass : filters) { 672 List<String> filterPart = splitter.splitToList(filterClass); 673 if (filterPart.size() != 2) { 674 LOG.warn("Invalid filter specification " + filterClass + " - skipping"); 675 } else { 676 ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1)); 677 } 678 } 679 } 680 } 681 682 /** 683 * Add options to command lines 684 * @param options options 685 */ 686 protected void addOptions(Options options) { 687 options.addOption("b", BIND_OPTION, true, 688 "Address to bind " + "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); 689 options.addOption("p", PORT_OPTION, true, 690 "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]"); 691 options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); 692 options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); 693 options.addOption("h", "help", false, "Print help information"); 694 options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use."); 695 options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); 696 697 options.addOption("m", MIN_WORKERS_OPTION, true, 698 "The minimum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName()); 699 700 options.addOption("w", MAX_WORKERS_OPTION, true, 701 "The maximum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName()); 702 703 options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, 704 "The maximum number of queued requests in " + ImplType.THREAD_POOL.simpleClassName()); 705 706 options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, 707 "The amount of time in secods to keep a thread alive when idle in " 708 + ImplType.THREAD_POOL.simpleClassName()); 709 710 options.addOption("t", READ_TIMEOUT_OPTION, true, 711 "Amount of time in milliseconds before a server thread will timeout " 712 + "waiting for client to send data on a connected socket. Currently, " 713 + "only applies to TBoundedThreadPoolServer"); 714 715 options.addOptionGroup(ImplType.createOptionGroup()); 716 } 717 718 protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException { 719 // Get port to bind to 720 try { 721 if (cmd.hasOption(PORT_OPTION)) { 722 int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); 723 conf.setInt(PORT_CONF_KEY, listenPort); 724 } 725 } catch (NumberFormatException e) { 726 LOG.error("Could not parse the value provided for the port option", e); 727 printUsageAndExit(options, -1); 728 } 729 // check for user-defined info server port setting, if so override the conf 730 try { 731 if (cmd.hasOption(INFOPORT_OPTION)) { 732 String val = cmd.getOptionValue(INFOPORT_OPTION); 733 conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val)); 734 LOG.debug("Web UI port set to " + val); 735 } 736 } catch (NumberFormatException e) { 737 LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + " option", e); 738 printUsageAndExit(options, -1); 739 } 740 // Make optional changes to the configuration based on command-line options 741 optionToConf(cmd, MIN_WORKERS_OPTION, conf, 742 TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); 743 optionToConf(cmd, MAX_WORKERS_OPTION, conf, 744 TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); 745 optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, conf, 746 TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); 747 optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf, 748 TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); 749 optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); 750 optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM); 751 752 // Set general thrift server options 753 boolean compact = cmd.hasOption(COMPACT_OPTION) || conf.getBoolean(COMPACT_CONF_KEY, false); 754 conf.setBoolean(COMPACT_CONF_KEY, compact); 755 boolean framed = cmd.hasOption(FRAMED_OPTION) || conf.getBoolean(FRAMED_CONF_KEY, false); 756 conf.setBoolean(FRAMED_CONF_KEY, framed); 757 758 optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY); 759 760 ImplType.setServerImpl(cmd, conf); 761 } 762 763 /** 764 * Parse the command line options to set parameters the conf. 765 */ 766 protected void processOptions(final String[] args) throws Exception { 767 if (args == null || args.length == 0) { 768 return; 769 } 770 Options options = new Options(); 771 addOptions(options); 772 773 CommandLineParser parser = new DefaultParser(); 774 CommandLine cmd = parser.parse(options, args); 775 776 if (cmd.hasOption("help")) { 777 printUsageAndExit(options, 1); 778 } 779 parseCommandLine(cmd, options); 780 } 781 782 public void stop() { 783 if (this.infoServer != null) { 784 LOG.info("Stopping infoServer"); 785 try { 786 this.infoServer.stop(); 787 } catch (Exception ex) { 788 LOG.error("Failed to stop infoServer", ex); 789 } 790 } 791 if (pauseMonitor != null) { 792 pauseMonitor.stop(); 793 } 794 if (tserver != null) { 795 tserver.stop(); 796 tserver = null; 797 } 798 if (httpServer != null) { 799 try { 800 httpServer.stop(); 801 httpServer = null; 802 } catch (Exception e) { 803 LOG.error("Problem encountered in shutting down HTTP server", e); 804 } 805 httpServer = null; 806 } 807 } 808 809 protected static void optionToConf(CommandLine cmd, String option, Configuration conf, 810 String destConfKey) { 811 if (cmd.hasOption(option)) { 812 String value = cmd.getOptionValue(option); 813 LOG.info("Set configuration key:" + destConfKey + " value:" + value); 814 conf.set(destConfKey, value); 815 } 816 } 817 818 /** 819 * Run without any command line arguments 820 * @return exit code 821 * @throws Exception exception 822 */ 823 public int run() throws Exception { 824 return run(null); 825 } 826 827 @Override 828 public int run(String[] strings) throws Exception { 829 processOptions(strings); 830 setupParamters(); 831 if (httpEnabled) { 832 setupHTTPServer(); 833 } else { 834 setupServer(); 835 } 836 return serviceUGI.doAs(new PrivilegedAction<Integer>() { 837 @Override 838 public Integer run() { 839 try { 840 startInfoServer(); 841 if (httpEnabled) { 842 httpServer.start(); 843 httpServer.join(); 844 } else { 845 tserver.serve(); 846 } 847 return 0; 848 } catch (Exception e) { 849 LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e); 850 return -1; 851 } 852 } 853 }); 854 } 855 856 public static void main(String[] args) throws Exception { 857 LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); 858 VersionInfo.logVersion(); 859 final Configuration conf = HBaseConfiguration.create(); 860 // for now, only time we return is on an argument error. 861 final int status = ToolRunner.run(conf, new ThriftServer(conf), args); 862 LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); 863 System.exit(status); 864 } 865}