001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift2.client;
019
020import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
021import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.lang.reflect.Constructor;
026import java.net.UnknownHostException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.ExecutorService;
031import javax.net.ssl.SSLException;
032import org.apache.commons.lang3.NotImplementedException;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.BufferedMutator;
038import org.apache.hadoop.hbase.client.BufferedMutatorParams;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionUtils;
041import org.apache.hadoop.hbase.client.RegionLocator;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableBuilder;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.thrift.Constants;
046import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.http.HttpRequest;
049import org.apache.http.client.HttpClient;
050import org.apache.http.client.config.RequestConfig;
051import org.apache.http.client.utils.HttpClientUtils;
052import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
053import org.apache.http.impl.client.HttpClientBuilder;
054import org.apache.http.protocol.HttpContext;
055import org.apache.thrift.TException;
056import org.apache.thrift.protocol.TBinaryProtocol;
057import org.apache.thrift.protocol.TCompactProtocol;
058import org.apache.thrift.protocol.TProtocol;
059import org.apache.thrift.transport.THttpClient;
060import org.apache.thrift.transport.TSocket;
061import org.apache.thrift.transport.TTransport;
062import org.apache.thrift.transport.TTransportException;
063import org.apache.thrift.transport.layered.TFramedTransport;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
069
070@InterfaceAudience.Private
071public class ThriftConnection implements Connection {
072  private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class);
073  private Configuration conf;
074  private User user;
075  // For HTTP protocol
076  private HttpClient httpClient;
077  private boolean httpClientCreated = false;
078  private boolean isClosed = false;
079
080  private String host;
081  private int port;
082  private boolean isFramed = false;
083  private boolean isCompact = false;
084
085  // TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time.
086  ThriftClientBuilder clientBuilder;
087
088  private int operationTimeout;
089  private int connectTimeout;
090
091  public ThriftConnection(Configuration conf, ExecutorService pool, final User user,
092    Map<String, byte[]> connectionAttributes) throws IOException {
093    this.conf = conf;
094    this.user = user;
095    this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
096    this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
097    Preconditions.checkArgument(port > 0);
098    Preconditions.checkArgument(host != null);
099    this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
100    this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
101    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
102      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
103    this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
104
105    String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
106      DefaultThriftClientBuilder.class.getName());
107    try {
108      Class<?> clazz = Class.forName(className);
109      Constructor<?> constructor = clazz.getDeclaredConstructor(ThriftConnection.class);
110      constructor.setAccessible(true);
111      clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
112    } catch (Exception e) {
113      throw new IOException(e);
114    }
115  }
116
117  public synchronized void setHttpClient(HttpClient httpClient) {
118    this.httpClient = httpClient;
119  }
120
121  @Override
122  public Configuration getConfiguration() {
123    return conf;
124  }
125
126  public String getHost() {
127    return host;
128  }
129
130  public int getPort() {
131    return port;
132  }
133
134  public boolean isFramed() {
135    return isFramed;
136  }
137
138  public boolean isCompact() {
139    return isCompact;
140  }
141
142  public int getOperationTimeout() {
143    return operationTimeout;
144  }
145
146  public int getConnectTimeout() {
147    return connectTimeout;
148  }
149
150  /**
151   * the default thrift client builder. One can extend the ThriftClientBuilder to builder custom
152   * client, implement features like authentication(hbase-examples/thrift/DemoClient)
153   */
154  public static class DefaultThriftClientBuilder extends ThriftClientBuilder {
155
156    @Override
157    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
158      TTransport tTransport = null;
159      try {
160        TSocket sock = new TSocket(connection.getHost(), connection.getPort());
161        sock.setSocketTimeout(connection.getOperationTimeout());
162        sock.setConnectTimeout(connection.getConnectTimeout());
163        tTransport = sock;
164        if (connection.isFramed()) {
165          tTransport = new TFramedTransport(tTransport);
166        }
167
168        sock.open();
169      } catch (TTransportException e) {
170        throw new IOException(e);
171      }
172      TProtocol prot;
173      if (connection.isCompact()) {
174        prot = new TCompactProtocol(tTransport);
175      } else {
176        prot = new TBinaryProtocol(tTransport);
177      }
178      THBaseService.Client client = new THBaseService.Client(prot);
179      return new Pair<>(client, tTransport);
180    }
181
182    public DefaultThriftClientBuilder(ThriftConnection connection) {
183      super(connection);
184    }
185  }
186
187  /**
188   * the default thrift http client builder. One can extend the ThriftClientBuilder to builder
189   * custom http client, implement features like authentication or
190   * 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
191   */
192  public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
193    Map<String, String> customHeader = new HashMap<>();
194
195    public HTTPThriftClientBuilder(ThriftConnection connection) {
196      super(connection);
197    }
198
199    public void addCostumHeader(String key, String value) {
200      customHeader.put(key, value);
201    }
202
203    @Override
204    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
205      Preconditions.checkArgument(connection.getHost().startsWith("http"),
206        "http client host must start with http or https");
207      String url = connection.getHost() + ":" + connection.getPort();
208      try {
209        THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
210        for (Map.Entry<String, String> header : customHeader.entrySet()) {
211          httpClient.setCustomHeader(header.getKey(), header.getValue());
212        }
213        httpClient.open();
214        TProtocol prot = new TBinaryProtocol(httpClient);
215        THBaseService.Client client = new THBaseService.Client(prot);
216        return new Pair<>(client, httpClient);
217      } catch (TTransportException e) {
218        throw new IOException(e);
219      }
220
221    }
222  }
223
224  /**
225   * Get a ThriftAdmin, ThriftAdmin is NOT thread safe
226   * @return a ThriftAdmin
227   * @throws IOException IOException
228   */
229  @Override
230  public Admin getAdmin() throws IOException {
231    Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
232    return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
233  }
234
235  public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
236    private long pause;
237
238    public DelayRetryHandler(int retryCount, long pause) {
239      super(retryCount, true, Arrays.asList(InterruptedIOException.class,
240        UnknownHostException.class, SSLException.class));
241      this.pause = pause;
242    }
243
244    @Override
245    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
246      // Don't sleep for retrying the first time
247      if (executionCount > 1 && pause > 0) {
248        try {
249          long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
250          Thread.sleep(sleepTime);
251        } catch (InterruptedException ie) {
252          // reset interrupt marker
253          Thread.currentThread().interrupt();
254        }
255      }
256      return super.retryRequest(exception, executionCount, context);
257    }
258
259    @Override
260    protected boolean handleAsIdempotent(HttpRequest request) {
261      return true;
262    }
263  }
264
265  public synchronized HttpClient getHttpClient() {
266    if (httpClient != null) {
267      return httpClient;
268    }
269    int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
270      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
271    long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
272    HttpClientBuilder builder = HttpClientBuilder.create();
273    RequestConfig.Builder requestBuilder = RequestConfig.custom();
274    requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
275    requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
276    builder.setRetryHandler(new DelayRetryHandler(retry, pause));
277    builder.setDefaultRequestConfig(requestBuilder.build());
278    httpClient = builder.build();
279    httpClientCreated = true;
280    return httpClient;
281  }
282
283  @Override
284  public synchronized void close() throws IOException {
285    if (httpClient != null && httpClientCreated) {
286      HttpClientUtils.closeQuietly(httpClient);
287    }
288    isClosed = true;
289  }
290
291  @Override
292  public boolean isClosed() {
293    return isClosed;
294  }
295
296  /**
297   * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
298   * @return a TableBuilder
299   * @throws IOException IOException
300   */
301  @Override
302  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
303    return new TableBuilder() {
304      @Override
305      public TableBuilder setOperationTimeout(int timeout) {
306        return this;
307      }
308
309      @Override
310      public TableBuilder setRpcTimeout(int timeout) {
311        return this;
312      }
313
314      @Override
315      public TableBuilder setReadRpcTimeout(int timeout) {
316        return this;
317      }
318
319      @Override
320      public TableBuilder setWriteRpcTimeout(int timeout) {
321        return this;
322      }
323
324      @Override
325      public TableBuilder setRequestAttribute(String key, byte[] value) {
326        return this;
327      }
328
329      @Override
330      public Table build() {
331        try {
332          Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
333          return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
334        } catch (IOException ioE) {
335          throw new RuntimeException(ioE);
336        }
337      }
338    };
339  }
340
341  @Override
342  public void abort(String why, Throwable e) {
343
344  }
345
346  @Override
347  public boolean isAborted() {
348    return false;
349  }
350
351  @Override
352  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
353    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
354  }
355
356  @Override
357  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
358    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
359  }
360
361  @Override
362  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
363    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
364  }
365
366  @Override
367  public void clearRegionLocationCache() {
368    throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
369  }
370
371  @Override
372  public String getClusterId() {
373    try {
374      Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
375      return client.getFirst().getClusterId();
376    } catch (TException | IOException e) {
377      LOG.error("Error fetching cluster ID: ", e);
378    }
379    return null;
380  }
381}