001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
021import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
022import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
023import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
024import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
025import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
026import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
027import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
028import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
029import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
030import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
031import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
032import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
033import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
034import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
035import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
036import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
037import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
038import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
039import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
040import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
041import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
042import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
043import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
044import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
045import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
046import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
047import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
048import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
049import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
050import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
051import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
052import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
053import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
054import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
055import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
056import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
057import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
058import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
059import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
060import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
061import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
062import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
063import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
064import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
065import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
066import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
067import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
068import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
069import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
070import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
071import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
072import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
073import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_DEFAULT;
074import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_TYPE_KEY;
075import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
076import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
077
078import java.io.IOException;
079import java.net.InetAddress;
080import java.net.InetSocketAddress;
081import java.net.UnknownHostException;
082import java.security.PrivilegedAction;
083import java.util.List;
084import java.util.Map;
085import java.util.concurrent.BlockingQueue;
086import java.util.concurrent.ExecutorService;
087import java.util.concurrent.LinkedBlockingQueue;
088import java.util.concurrent.ThreadPoolExecutor;
089import java.util.concurrent.TimeUnit;
090import javax.security.auth.callback.Callback;
091import javax.security.auth.callback.UnsupportedCallbackException;
092import javax.security.sasl.AuthorizeCallback;
093import javax.security.sasl.SaslServer;
094import org.apache.commons.lang3.ArrayUtils;
095import org.apache.hadoop.conf.Configuration;
096import org.apache.hadoop.conf.Configured;
097import org.apache.hadoop.hbase.HBaseConfiguration;
098import org.apache.hadoop.hbase.HBaseInterfaceAudience;
099import org.apache.hadoop.hbase.filter.ParseFilter;
100import org.apache.hadoop.hbase.http.HttpServerUtil;
101import org.apache.hadoop.hbase.http.InfoServer;
102import org.apache.hadoop.hbase.log.HBaseMarkers;
103import org.apache.hadoop.hbase.security.SaslUtil;
104import org.apache.hadoop.hbase.security.SecurityUtil;
105import org.apache.hadoop.hbase.security.UserProvider;
106import org.apache.hadoop.hbase.thrift.generated.Hbase;
107import org.apache.hadoop.hbase.util.DNS;
108import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
109import org.apache.hadoop.hbase.util.JvmPauseMonitor;
110import org.apache.hadoop.hbase.util.Strings;
111import org.apache.hadoop.hbase.util.VersionInfo;
112import org.apache.hadoop.security.SaslRpcServer;
113import org.apache.hadoop.security.UserGroupInformation;
114import org.apache.hadoop.security.authorize.ProxyUsers;
115import org.apache.hadoop.util.Shell.ExitCodeException;
116import org.apache.hadoop.util.Tool;
117import org.apache.hadoop.util.ToolRunner;
118import org.apache.thrift.TProcessor;
119import org.apache.thrift.protocol.TBinaryProtocol;
120import org.apache.thrift.protocol.TCompactProtocol;
121import org.apache.thrift.protocol.TProtocolFactory;
122import org.apache.thrift.server.THsHaServer;
123import org.apache.thrift.server.TNonblockingServer;
124import org.apache.thrift.server.TServer;
125import org.apache.thrift.server.TServlet;
126import org.apache.thrift.server.TThreadedSelectorServer;
127import org.apache.thrift.transport.TNonblockingServerSocket;
128import org.apache.thrift.transport.TNonblockingServerTransport;
129import org.apache.thrift.transport.TSaslServerTransport;
130import org.apache.thrift.transport.TServerSocket;
131import org.apache.thrift.transport.TServerTransport;
132import org.apache.thrift.transport.TTransportFactory;
133import org.apache.thrift.transport.layered.TFramedTransport;
134import org.apache.yetus.audience.InterfaceAudience;
135import org.slf4j.Logger;
136import org.slf4j.LoggerFactory;
137
138import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
139import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
140import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
141import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
142import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
143import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
144import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
145import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
146import org.apache.hbase.thirdparty.org.eclipse.jetty.http.HttpVersion;
147import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConfiguration;
148import org.apache.hbase.thirdparty.org.eclipse.jetty.server.HttpConnectionFactory;
149import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SecureRequestCustomizer;
150import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
151import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
152import org.apache.hbase.thirdparty.org.eclipse.jetty.server.SslConnectionFactory;
153import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletContextHandler;
154import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
155import org.apache.hbase.thirdparty.org.eclipse.jetty.util.ssl.SslContextFactory;
156import org.apache.hbase.thirdparty.org.eclipse.jetty.util.thread.QueuedThreadPool;
157
158/**
159 * ThriftServer- this class starts up a Thrift server which implements the Hbase API specified in
160 * the Hbase.thrift IDL file. The server runs in an independent process.
161 */
162@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
163public class ThriftServer extends Configured implements Tool {
164
165  private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
166
167  protected Configuration conf;
168
169  protected InfoServer infoServer;
170
171  protected TProcessor processor;
172
173  protected ThriftMetrics metrics;
174  protected HBaseServiceHandler hbaseServiceHandler;
175  protected UserGroupInformation serviceUGI;
176  protected UserGroupInformation httpUGI;
177  protected boolean httpEnabled;
178
179  protected SaslUtil.QualityOfProtection qop;
180  protected String host;
181  protected int listenPort;
182
183  protected boolean securityEnabled;
184  protected boolean doAsEnabled;
185
186  protected JvmPauseMonitor pauseMonitor;
187
188  protected volatile TServer tserver;
189  protected volatile Server httpServer;
190
191  //
192  // Main program and support routines
193  //
194
195  public ThriftServer(Configuration conf) {
196    this.conf = HBaseConfiguration.create(conf);
197  }
198
199  protected ThriftMetrics createThriftMetrics(Configuration conf) {
200    return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
201  }
202
203  protected void setupParamters() throws IOException {
204    // login the server principal (if using secure Hadoop)
205    UserProvider userProvider = UserProvider.instantiate(conf);
206    securityEnabled =
207      userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled();
208    if (securityEnabled) {
209      host = Strings.domainNamePointerToHostName(
210        DNS.getDefaultHost(conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
211          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
212      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
213
214      // Setup the SPNEGO user for HTTP if configured
215      String spnegoPrincipal = getSpengoPrincipal(conf, host);
216      String spnegoKeytab = getSpnegoKeytab(conf);
217      UserGroupInformation.setConfiguration(conf);
218      // login the SPNEGO principal using UGI to avoid polluting the login user
219      this.httpUGI =
220        UserGroupInformation.loginUserFromKeytabAndReturnUGI(spnegoPrincipal, spnegoKeytab);
221    }
222    this.serviceUGI = userProvider.getCurrent().getUGI();
223    if (httpUGI == null) {
224      this.httpUGI = serviceUGI;
225    }
226
227    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
228    this.metrics = createThriftMetrics(conf);
229    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
230    this.hbaseServiceHandler = createHandler(conf, userProvider);
231    this.hbaseServiceHandler.initMetrics(metrics);
232    this.processor = createProcessor();
233
234    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
235    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
236    if (doAsEnabled && !httpEnabled) {
237      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
238    }
239
240    String strQop = conf.get(THRIFT_QOP_KEY);
241    if (strQop != null) {
242      this.qop = SaslUtil.getQop(strQop);
243    }
244    if (qop != null) {
245      if (
246        qop != SaslUtil.QualityOfProtection.AUTHENTICATION
247          && qop != SaslUtil.QualityOfProtection.INTEGRITY
248          && qop != SaslUtil.QualityOfProtection.PRIVACY
249      ) {
250        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
251          THRIFT_QOP_KEY, SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
252          SaslUtil.QualityOfProtection.INTEGRITY.name(),
253          SaslUtil.QualityOfProtection.PRIVACY.name()));
254      }
255      checkHttpSecurity(qop, conf);
256      if (!securityEnabled) {
257        throw new IOException("Thrift server must run in secure mode to support authentication");
258      }
259    }
260    registerFilters(conf);
261    pauseMonitor.start();
262  }
263
264  private String getSpengoPrincipal(Configuration conf, String host) throws IOException {
265    String principal = conf.get(THRIFT_SPNEGO_PRINCIPAL_KEY);
266    if (principal == null) {
267      // We cannot use the Hadoop configuration deprecation handling here since
268      // the THRIFT_KERBEROS_PRINCIPAL_KEY config is still valid for regular Kerberos
269      // communication. The preference should be to use the THRIFT_SPNEGO_PRINCIPAL_KEY
270      // config so that THRIFT_KERBEROS_PRINCIPAL_KEY doesn't control both backend
271      // Kerberos principal and SPNEGO principal.
272      LOG.info("Using deprecated {} config for SPNEGO principal. Use {} instead.",
273        THRIFT_KERBEROS_PRINCIPAL_KEY, THRIFT_SPNEGO_PRINCIPAL_KEY);
274      principal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
275    }
276    // Handle _HOST in principal value
277    return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principal, host);
278  }
279
280  private String getSpnegoKeytab(Configuration conf) {
281    String keytab = conf.get(THRIFT_SPNEGO_KEYTAB_FILE_KEY);
282    if (keytab == null) {
283      // We cannot use the Hadoop configuration deprecation handling here since
284      // the THRIFT_KEYTAB_FILE_KEY config is still valid for regular Kerberos
285      // communication. The preference should be to use the THRIFT_SPNEGO_KEYTAB_FILE_KEY
286      // config so that THRIFT_KEYTAB_FILE_KEY doesn't control both backend
287      // Kerberos keytab and SPNEGO keytab.
288      LOG.info("Using deprecated {} config for SPNEGO keytab. Use {} instead.",
289        THRIFT_KEYTAB_FILE_KEY, THRIFT_SPNEGO_KEYTAB_FILE_KEY);
290      keytab = conf.get(THRIFT_KEYTAB_FILE_KEY);
291    }
292    return keytab;
293  }
294
295  protected void startInfoServer() throws IOException {
296    // Put up info server.
297    int port = conf.getInt(THRIFT_INFO_SERVER_PORT, THRIFT_INFO_SERVER_PORT_DEFAULT);
298
299    if (port >= 0) {
300      conf.setLong("startcode", EnvironmentEdgeManager.currentTime());
301      String a =
302        conf.get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
303      infoServer = new InfoServer("thrift", a, port, false, conf);
304      infoServer.setAttribute("hbase.conf", conf);
305      infoServer.setAttribute("hbase.thrift.server.type", metrics.getThriftServerType().name());
306      infoServer.start();
307    }
308  }
309
310  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
311    if (
312      qop == SaslUtil.QualityOfProtection.PRIVACY && conf.getBoolean(USE_HTTP_CONF_KEY, false)
313        && !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)
314    ) {
315      throw new IllegalArgumentException(
316        "Thrift HTTP Server's QoP is privacy, but " + THRIFT_SSL_ENABLED_KEY + " is false");
317    }
318  }
319
320  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
321    throws IOException {
322    return new ThriftHBaseServiceHandler(conf, userProvider);
323  }
324
325  protected TProcessor createProcessor() {
326    return new Hbase.Processor<>(
327      HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
328  }
329
330  /**
331   * the thrift server, not null means the server is started, for test only
332   * @return the tServer
333   */
334  @InterfaceAudience.Private
335  public TServer getTserver() {
336    return tserver;
337  }
338
339  /**
340   * the Jetty server, not null means the HTTP server is started, for test only
341   * @return the http server
342   */
343  @InterfaceAudience.Private
344  public Server getHttpServer() {
345    return httpServer;
346  }
347
348  protected void printUsageAndExit(Options options, int exitCode) throws ExitCodeException {
349    HelpFormatter formatter = new HelpFormatter();
350    formatter.printHelp("Thrift", null, options,
351      "To start the Thrift server run 'hbase-daemon.sh start thrift' or " + "'hbase thrift'\n"
352        + "To shutdown the thrift server run 'hbase-daemon.sh stop "
353        + "thrift' or send a kill signal to the thrift server pid",
354      true);
355    throw new ExitCodeException(exitCode, "");
356  }
357
358  /**
359   * Create a Servlet for the http server
360   * @param protocolFactory protocolFactory
361   * @return the servlet
362   */
363  protected TServlet createTServlet(TProtocolFactory protocolFactory) {
364    return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, httpUGI,
365      hbaseServiceHandler, securityEnabled, doAsEnabled);
366  }
367
368  /**
369   * Setup an HTTP Server using Jetty to serve calls from THttpClient
370   * @throws IOException IOException
371   */
372  protected void setupHTTPServer() throws IOException {
373    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
374    TServlet thriftHttpServlet = createTServlet(protocolFactory);
375
376    // Set the default max thread number to 100 to limit
377    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
378    // Jetty set the default max thread number to 250, if we don't set it.
379    //
380    // Our default min thread number 2 is the same as that used by Jetty.
381    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, conf
382      .getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, HTTP_MIN_THREADS_KEY_DEFAULT));
383    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, conf
384      .getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, HTTP_MAX_THREADS_KEY_DEFAULT));
385    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
386    threadPool.setMinThreads(minThreads);
387    httpServer = new Server(threadPool);
388
389    // Context handler
390    ServletContextHandler ctxHandler =
391      new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
392    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
393    HttpServerUtil.constrainHttpMethods(ctxHandler,
394      conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
395
396    // set up Jetty and run the embedded server
397    HttpConfiguration httpConfig = new HttpConfiguration();
398    httpConfig.setSecureScheme("https");
399    httpConfig.setSecurePort(listenPort);
400    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
401    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
402    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
403    httpConfig.setSendServerVersion(false);
404    httpConfig.setSendDateHeader(false);
405
406    ServerConnector serverConnector;
407    if (conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
408      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
409      httpsConfig.addCustomizer(new SecureRequestCustomizer());
410
411      SslContextFactory.Server sslCtxFactory = new SslContextFactory.Server();
412      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
413      String password =
414        HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
415      String keyPassword =
416        HBaseConfiguration.getPassword(conf, THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
417      sslCtxFactory.setKeyStorePath(keystore);
418      sslCtxFactory.setKeyStorePassword(password);
419      sslCtxFactory.setKeyManagerPassword(keyPassword);
420      sslCtxFactory
421        .setKeyStoreType(conf.get(THRIFT_SSL_KEYSTORE_TYPE_KEY, THRIFT_SSL_KEYSTORE_TYPE_DEFAULT));
422
423      String[] excludeCiphers =
424        conf.getStrings(THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
425      if (excludeCiphers.length != 0) {
426        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
427      }
428      String[] includeCiphers =
429        conf.getStrings(THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
430      if (includeCiphers.length != 0) {
431        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
432      }
433
434      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
435      String[] excludeProtocols = conf.getStrings(THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
436      if (excludeProtocols.length != 0) {
437        sslCtxFactory.setExcludeProtocols(excludeProtocols);
438      }
439      String[] includeProtocols =
440        conf.getStrings(THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
441      if (includeProtocols.length != 0) {
442        sslCtxFactory.setIncludeProtocols(includeProtocols);
443      }
444
445      serverConnector = new ServerConnector(httpServer,
446        new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
447        new HttpConnectionFactory(httpsConfig));
448    } else {
449      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
450    }
451    serverConnector.setPort(listenPort);
452    serverConnector.setHost(getBindAddress(conf).getHostAddress());
453    httpServer.addConnector(serverConnector);
454    httpServer.setStopAtShutdown(true);
455
456    if (doAsEnabled) {
457      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
458    }
459    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
460  }
461
462  /**
463   * Setting up the thrift TServer
464   */
465  protected void setupServer() throws Exception {
466    // Construct correct ProtocolFactory
467    TProtocolFactory protocolFactory = getProtocolFactory();
468
469    ImplType implType = ImplType.getServerImpl(conf);
470    TProcessor processorToUse = processor;
471
472    // Construct correct TransportFactory
473    TTransportFactory transportFactory;
474    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
475      if (qop != null) {
476        throw new RuntimeException(
477          "Thrift server authentication" + " doesn't work with framed transport yet");
478      }
479      transportFactory = new TFramedTransport.Factory(
480        conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
481      LOG.debug("Using framed transport");
482    } else if (qop == null) {
483      transportFactory = new TTransportFactory();
484    } else {
485      // Extract the name from the principal
486      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
487      if (thriftKerberosPrincipal == null) {
488        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
489      }
490      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
491      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
492      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
493      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
494        new SaslRpcServer.SaslGssCallbackHandler() {
495          @Override
496          public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
497            AuthorizeCallback ac = null;
498            for (Callback callback : callbacks) {
499              if (callback instanceof AuthorizeCallback) {
500                ac = (AuthorizeCallback) callback;
501              } else {
502                throw new UnsupportedCallbackException(callback,
503                  "Unrecognized SASL GSSAPI Callback");
504              }
505            }
506            if (ac != null) {
507              String authid = ac.getAuthenticationID();
508              String authzid = ac.getAuthorizationID();
509              if (!authid.equals(authzid)) {
510                ac.setAuthorized(false);
511              } else {
512                ac.setAuthorized(true);
513                String userName = SecurityUtil.getUserFromPrincipal(authzid);
514                LOG.info("Effective user: {}", userName);
515                ac.setAuthorizedID(userName);
516              }
517            }
518          }
519        });
520      transportFactory = saslFactory;
521
522      // Create a processor wrapper, to get the caller
523      processorToUse = (inProt, outProt) -> {
524        TSaslServerTransport saslServerTransport = (TSaslServerTransport) inProt.getTransport();
525        SaslServer saslServer = saslServerTransport.getSaslServer();
526        String principal = saslServer.getAuthorizationID();
527        hbaseServiceHandler.setEffectiveUser(principal);
528        processor.process(inProt, outProt);
529      };
530    }
531
532    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
533      LOG.error(
534        "Server types {} don't support IP address binding at the moment. See "
535          + "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
536        Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
537      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
538    }
539
540    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
541    if (
542      implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING
543        || implType == ImplType.THREADED_SELECTOR
544    ) {
545      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
546      if (implType == ImplType.NONBLOCKING) {
547        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
548          transportFactory, inetSocketAddress);
549      } else if (implType == ImplType.HS_HA) {
550        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
551          inetSocketAddress);
552      } else { // THREADED_SELECTOR
553        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
554          transportFactory, inetSocketAddress);
555      }
556      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
557        Integer.toString(listenPort));
558    } else if (implType == ImplType.THREAD_POOL) {
559      this.tserver =
560        getTThreadPoolServer(protocolFactory, processorToUse, transportFactory, inetSocketAddress);
561    } else {
562      throw new AssertionError(
563        "Unsupported Thrift server implementation: " + implType.simpleClassName());
564    }
565
566    // A sanity check that we instantiated the right type of server.
567    if (tserver.getClass() != implType.serverClass) {
568      throw new AssertionError("Expected to create Thrift server class "
569        + implType.serverClass.getName() + " but got " + tserver.getClass().getName());
570    }
571  }
572
573  protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
574    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
575    InetSocketAddress inetSocketAddress) {
576    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
577    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
578    serverArgs.processor(processor);
579    serverArgs.transportFactory(transportFactory);
580    serverArgs.protocolFactory(protocolFactory);
581    return new TNonblockingServer(serverArgs);
582  }
583
584  protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
585    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
586    InetSocketAddress inetSocketAddress) {
587    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
588    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
589    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
590      TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
591    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
592    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
593      serverArgs.getMaxWorkerThreads());
594    ExecutorService executorService = createExecutor(callQueue, workerThread, workerThread);
595    serverArgs.executorService(executorService).processor(processor)
596      .transportFactory(transportFactory).protocolFactory(protocolFactory);
597    return new THsHaServer(serverArgs);
598  }
599
600  protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
601    TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
602    InetSocketAddress inetSocketAddress) {
603    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
604    TThreadedSelectorServer.Args serverArgs =
605      new HThreadedSelectorServerArgs(serverTransport, conf);
606    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
607      TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
608    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
609    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
610      serverArgs.getWorkerThreads());
611    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
612    serverArgs.selectorThreads(selectorThreads);
613    ExecutorService executorService = createExecutor(callQueue, workerThreads, workerThreads);
614    serverArgs.executorService(executorService).processor(processor)
615      .transportFactory(transportFactory).protocolFactory(protocolFactory);
616    return new TThreadedSelectorServer(serverArgs);
617  }
618
619  protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
620    TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
621    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
622    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
623    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
624    int readTimeout =
625      conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY, THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
626    TServerTransport serverTransport =
627      new TServerSocket(new TServerSocket.ServerSocketTransportArgs().bindAddr(inetSocketAddress)
628        .backlog(backlog).clientTimeout(readTimeout));
629
630    TBoundedThreadPoolServer.Args serverArgs =
631      new TBoundedThreadPoolServer.Args(serverTransport, conf);
632    serverArgs.processor(processor).transportFactory(transportFactory)
633      .protocolFactory(protocolFactory);
634    return new TBoundedThreadPoolServer(serverArgs, metrics);
635  }
636
637  protected TProtocolFactory getProtocolFactory() {
638    TProtocolFactory protocolFactory;
639
640    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
641      LOG.debug("Using compact protocol");
642      protocolFactory = new TCompactProtocol.Factory();
643    } else {
644      LOG.debug("Using binary protocol");
645      protocolFactory = new TBinaryProtocol.Factory();
646    }
647
648    return protocolFactory;
649  }
650
651  protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue, int minWorkers,
652    int maxWorkers) {
653    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
654    tfb.setDaemon(true);
655    tfb.setNameFormat("thrift-worker-%d");
656    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
657      Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
658    threadPool.allowCoreThreadTimeOut(true);
659    return threadPool;
660  }
661
662  protected InetAddress getBindAddress(Configuration conf) throws UnknownHostException {
663    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
664    return InetAddress.getByName(bindAddressStr);
665  }
666
667  public static void registerFilters(Configuration conf) {
668    String[] filters = conf.getStrings(THRIFT_FILTERS);
669    Splitter splitter = Splitter.on(':');
670    if (filters != null) {
671      for (String filterClass : filters) {
672        List<String> filterPart = splitter.splitToList(filterClass);
673        if (filterPart.size() != 2) {
674          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
675        } else {
676          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
677        }
678      }
679    }
680  }
681
682  /**
683   * Add options to command lines
684   * @param options options
685   */
686  protected void addOptions(Options options) {
687    options.addOption("b", BIND_OPTION, true,
688      "Address to bind " + "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
689    options.addOption("p", PORT_OPTION, true,
690      "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
691    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
692    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
693    options.addOption("h", "help", false, "Print help information");
694    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
695    options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
696
697    options.addOption("m", MIN_WORKERS_OPTION, true,
698      "The minimum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName());
699
700    options.addOption("w", MAX_WORKERS_OPTION, true,
701      "The maximum number of worker threads for " + ImplType.THREAD_POOL.simpleClassName());
702
703    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
704      "The maximum number of queued requests in " + ImplType.THREAD_POOL.simpleClassName());
705
706    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
707      "The amount of time in secods to keep a thread alive when idle in "
708        + ImplType.THREAD_POOL.simpleClassName());
709
710    options.addOption("t", READ_TIMEOUT_OPTION, true,
711      "Amount of time in milliseconds before a server thread will timeout "
712        + "waiting for client to send data on a connected socket. Currently, "
713        + "only applies to TBoundedThreadPoolServer");
714
715    options.addOptionGroup(ImplType.createOptionGroup());
716  }
717
718  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
719    // Get port to bind to
720    try {
721      if (cmd.hasOption(PORT_OPTION)) {
722        int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
723        conf.setInt(PORT_CONF_KEY, listenPort);
724      }
725    } catch (NumberFormatException e) {
726      LOG.error("Could not parse the value provided for the port option", e);
727      printUsageAndExit(options, -1);
728    }
729    // check for user-defined info server port setting, if so override the conf
730    try {
731      if (cmd.hasOption(INFOPORT_OPTION)) {
732        String val = cmd.getOptionValue(INFOPORT_OPTION);
733        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
734        LOG.debug("Web UI port set to " + val);
735      }
736    } catch (NumberFormatException e) {
737      LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION + " option", e);
738      printUsageAndExit(options, -1);
739    }
740    // Make optional changes to the configuration based on command-line options
741    optionToConf(cmd, MIN_WORKERS_OPTION, conf,
742      TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
743    optionToConf(cmd, MAX_WORKERS_OPTION, conf,
744      TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
745    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, conf,
746      TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
747    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf,
748      TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
749    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
750    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
751
752    // Set general thrift server options
753    boolean compact = cmd.hasOption(COMPACT_OPTION) || conf.getBoolean(COMPACT_CONF_KEY, false);
754    conf.setBoolean(COMPACT_CONF_KEY, compact);
755    boolean framed = cmd.hasOption(FRAMED_OPTION) || conf.getBoolean(FRAMED_CONF_KEY, false);
756    conf.setBoolean(FRAMED_CONF_KEY, framed);
757
758    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
759
760    ImplType.setServerImpl(cmd, conf);
761  }
762
763  /**
764   * Parse the command line options to set parameters the conf.
765   */
766  protected void processOptions(final String[] args) throws Exception {
767    if (args == null || args.length == 0) {
768      return;
769    }
770    Options options = new Options();
771    addOptions(options);
772
773    CommandLineParser parser = new DefaultParser();
774    CommandLine cmd = parser.parse(options, args);
775
776    if (cmd.hasOption("help")) {
777      printUsageAndExit(options, 1);
778    }
779    parseCommandLine(cmd, options);
780  }
781
782  public void stop() {
783    if (this.infoServer != null) {
784      LOG.info("Stopping infoServer");
785      try {
786        this.infoServer.stop();
787      } catch (Exception ex) {
788        LOG.error("Failed to stop infoServer", ex);
789      }
790    }
791    if (pauseMonitor != null) {
792      pauseMonitor.stop();
793    }
794    if (tserver != null) {
795      tserver.stop();
796      tserver = null;
797    }
798    if (httpServer != null) {
799      try {
800        httpServer.stop();
801        httpServer = null;
802      } catch (Exception e) {
803        LOG.error("Problem encountered in shutting down HTTP server", e);
804      }
805      httpServer = null;
806    }
807  }
808
809  protected static void optionToConf(CommandLine cmd, String option, Configuration conf,
810    String destConfKey) {
811    if (cmd.hasOption(option)) {
812      String value = cmd.getOptionValue(option);
813      LOG.info("Set configuration key:" + destConfKey + " value:" + value);
814      conf.set(destConfKey, value);
815    }
816  }
817
818  /**
819   * Run without any command line arguments
820   * @return exit code
821   * @throws Exception exception
822   */
823  public int run() throws Exception {
824    return run(null);
825  }
826
827  @Override
828  public int run(String[] strings) throws Exception {
829    processOptions(strings);
830    setupParamters();
831    if (httpEnabled) {
832      setupHTTPServer();
833    } else {
834      setupServer();
835    }
836    return serviceUGI.doAs(new PrivilegedAction<Integer>() {
837      @Override
838      public Integer run() {
839        try {
840          startInfoServer();
841          if (httpEnabled) {
842            httpServer.start();
843            httpServer.join();
844          } else {
845            tserver.serve();
846          }
847          return 0;
848        } catch (Exception e) {
849          LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e);
850          return -1;
851        }
852      }
853    });
854  }
855
856  public static void main(String[] args) throws Exception {
857    LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
858    VersionInfo.logVersion();
859    final Configuration conf = HBaseConfiguration.create();
860    // for now, only time we return is on an argument error.
861    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
862    LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
863    System.exit(status);
864  }
865}