001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.security.PrivilegedExceptionAction;
025import java.util.Collections;
026import java.util.Map;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutorService;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.AuthUtil;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.security.UserProvider;
034import org.apache.hadoop.hbase.trace.TraceUtil;
035import org.apache.hadoop.hbase.util.ReflectionUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
040 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
041 * {@link Connection}, {@link Table} implementations are retrieved with
042 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
043 *
044 * <pre>
045 * Connection connection = ConnectionFactory.createConnection(config);
046 * Table table = connection.getTable(TableName.valueOf("table1"));
047 * try {
048 *   // Use the table as needed, for a single operation and a single thread
049 * } finally {
050 *   table.close();
051 *   connection.close();
052 * }
053 * </pre>
054 *
055 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
056 * credentials if caller has following two configurations set:
057 * <ul>
058 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
059 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
060 * </ul>
061 * By this way, caller can directly connect to kerberized cluster without caring login and
062 * credentials renewal logic in application.
063 *
064 * <pre>
065 * </pre>
066 *
067 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
068 * implementations.
069 * @see Connection
070 * @since 0.99.0
071 */
072@InterfaceAudience.Public
073public class ConnectionFactory {
074
075  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
076    "hbase.client.async.connection.impl";
077
078  /** No public c.tors */
079  protected ConnectionFactory() {
080  }
081
082  /**
083   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
084   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
085   * connection share zookeeper connection, meta cache, and connections to region servers and
086   * masters. <br>
087   * The caller is responsible for calling {@link Connection#close()} on the returned connection
088   * instance. Typical usage:
089   *
090   * <pre>
091   * Connection connection = ConnectionFactory.createConnection();
092   * Table table = connection.getTable(TableName.valueOf("mytable"));
093   * try {
094   *   table.get(...);
095   *   ...
096   * } finally {
097   *   table.close();
098   *   connection.close();
099   * }
100   * </pre>
101   *
102   * @return Connection object for <code>conf</code>
103   */
104  public static Connection createConnection() throws IOException {
105    Configuration conf = HBaseConfiguration.create();
106    return createConnection(conf, null, AuthUtil.loginClient(conf), Collections.emptyMap());
107  }
108
109  /**
110   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
111   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
112   * created from returned connection share zookeeper connection, meta cache, and connections to
113   * region servers and masters. <br>
114   * The caller is responsible for calling {@link Connection#close()} on the returned connection
115   * instance. Typical usage:
116   *
117   * <pre>
118   * Connection connection = ConnectionFactory.createConnection(conf);
119   * Table table = connection.getTable(TableName.valueOf("mytable"));
120   * try {
121   *   table.get(...);
122   *   ...
123   * } finally {
124   *   table.close();
125   *   connection.close();
126   * }
127   * </pre>
128   *
129   * @param conf configuration
130   * @return Connection object for <code>conf</code>
131   */
132  public static Connection createConnection(Configuration conf) throws IOException {
133    return createConnection(conf, null, AuthUtil.loginClient(conf), Collections.emptyMap());
134  }
135
136  /**
137   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
138   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
139   * created from returned connection share zookeeper connection, meta cache, and connections to
140   * region servers and masters. <br>
141   * The caller is responsible for calling {@link Connection#close()} on the returned connection
142   * instance. Typical usage:
143   *
144   * <pre>
145   * Connection connection = ConnectionFactory.createConnection(conf);
146   * Table table = connection.getTable(TableName.valueOf("mytable"));
147   * try {
148   *   table.get(...);
149   *   ...
150   * } finally {
151   *   table.close();
152   *   connection.close();
153   * }
154   * </pre>
155   *
156   * @param conf configuration
157   * @param pool the thread pool to use for batch operations
158   * @return Connection object for <code>conf</code>
159   */
160  public static Connection createConnection(Configuration conf, ExecutorService pool)
161    throws IOException {
162    return createConnection(conf, pool, AuthUtil.loginClient(conf), Collections.emptyMap());
163  }
164
165  /**
166   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
167   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
168   * created from returned connection share zookeeper connection, meta cache, and connections to
169   * region servers and masters. <br>
170   * The caller is responsible for calling {@link Connection#close()} on the returned connection
171   * instance. Typical usage:
172   *
173   * <pre>
174   * Connection connection = ConnectionFactory.createConnection(conf);
175   * Table table = connection.getTable(TableName.valueOf("table1"));
176   * try {
177   *   table.get(...);
178   *   ...
179   * } finally {
180   *   table.close();
181   *   connection.close();
182   * }
183   * </pre>
184   *
185   * @param conf configuration
186   * @param user the user the connection is for
187   * @return Connection object for <code>conf</code>
188   */
189  public static Connection createConnection(Configuration conf, User user) throws IOException {
190    return createConnection(conf, null, user, Collections.emptyMap());
191  }
192
193  /**
194   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
195   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
196   * created from returned connection share zookeeper connection, meta cache, and connections to
197   * region servers and masters. <br>
198   * The caller is responsible for calling {@link Connection#close()} on the returned connection
199   * instance. Typical usage:
200   *
201   * <pre>
202   * Connection connection = ConnectionFactory.createConnection(conf);
203   * Table table = connection.getTable(TableName.valueOf("table1"));
204   * try {
205   *   table.get(...);
206   *   ...
207   * } finally {
208   *   table.close();
209   *   connection.close();
210   * }
211   * </pre>
212   *
213   * @param conf configuration
214   * @param user the user the connection is for
215   * @param pool the thread pool to use for batch operations
216   * @return Connection object for <code>conf</code>
217   */
218  public static Connection createConnection(Configuration conf, ExecutorService pool,
219    final User user) throws IOException {
220    return createConnection(conf, pool, user, Collections.emptyMap());
221  }
222
223  /**
224   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
225   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
226   * created from returned connection share zookeeper connection, meta cache, and connections to
227   * region servers and masters. <br>
228   * The caller is responsible for calling {@link Connection#close()} on the returned connection
229   * instance. Typical usage:
230   *
231   * <pre>
232   * Connection connection = ConnectionFactory.createConnection(conf);
233   * Table table = connection.getTable(TableName.valueOf("table1"));
234   * try {
235   *   table.get(...);
236   *   ...
237   * } finally {
238   *   table.close();
239   *   connection.close();
240   * }
241   * </pre>
242   *
243   * @param conf                 configuration
244   * @param user                 the user the connection is for
245   * @param pool                 the thread pool to use for batch operations
246   * @param connectionAttributes attributes to be sent along to server during connection establish
247   * @return Connection object for <code>conf</code>
248   */
249  public static Connection createConnection(Configuration conf, ExecutorService pool,
250    final User user, Map<String, byte[]> connectionAttributes) throws IOException {
251    return TraceUtil.trace(() -> {
252      String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
253        ConnectionImplementation.class.getName());
254      Class<?> clazz;
255      try {
256        clazz = Class.forName(className);
257      } catch (ClassNotFoundException e) {
258        throw new IOException(e);
259      }
260      try {
261        // Default HCM#HCI is not accessible; make it so before invoking.
262        Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
263          ExecutorService.class, User.class, Map.class);
264        constructor.setAccessible(true);
265        return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
266          .newInstance(conf, pool, user, connectionAttributes));
267      } catch (Exception e) {
268        throw new IOException(e);
269      }
270    }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
271  }
272
273  /**
274   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
275   * @see #createAsyncConnection(Configuration)
276   * @return AsyncConnection object wrapped by CompletableFuture
277   */
278  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
279    return createAsyncConnection(HBaseConfiguration.create());
280  }
281
282  /**
283   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
284   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
285   * initialize the {@link UserProvider}.
286   * @param conf configuration
287   * @return AsyncConnection object wrapped by CompletableFuture
288   * @see #createAsyncConnection(Configuration, User)
289   * @see UserProvider
290   */
291  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
292    User user;
293    try {
294      user = AuthUtil.loginClient(conf);
295    } catch (IOException e) {
296      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
297      future.completeExceptionally(e);
298      return future;
299    }
300    return createAsyncConnection(conf, user);
301  }
302
303  /**
304   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
305   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
306   * interfaces created from returned connection share zookeeper connection, meta cache, and
307   * connections to region servers and masters.
308   * <p>
309   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
310   * connection instance.
311   * <p>
312   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
313   * as it is thread safe.
314   * @param conf configuration
315   * @param user the user the asynchronous connection is for
316   * @return AsyncConnection object wrapped by CompletableFuture
317   */
318  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
319    final User user) {
320    return createAsyncConnection(conf, user, null);
321  }
322
323  /**
324   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
325   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
326   * interfaces created from returned connection share zookeeper connection, meta cache, and
327   * connections to region servers and masters.
328   * <p>
329   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
330   * connection instance.
331   * <p>
332   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
333   * as it is thread safe.
334   * @param conf                 configuration
335   * @param user                 the user the asynchronous connection is for
336   * @param connectionAttributes attributes to be sent along to server during connection establish
337   * @return AsyncConnection object wrapped by CompletableFuture
338   */
339  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
340    final User user, Map<String, byte[]> connectionAttributes) {
341    return TraceUtil.tracedFuture(() -> {
342      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
343      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user);
344      addListener(registry.getClusterId(), (clusterId, error) -> {
345        if (error != null) {
346          registry.close();
347          future.completeExceptionally(error);
348          return;
349        }
350        if (clusterId == null) {
351          registry.close();
352          future.completeExceptionally(new IOException("clusterid came back null"));
353          return;
354        }
355        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
356          AsyncConnectionImpl.class, AsyncConnection.class);
357        try {
358          future.complete(
359            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
360              .newInstance(clazz, conf, registry, clusterId, user, connectionAttributes)));
361        } catch (Exception e) {
362          registry.close();
363          future.completeExceptionally(e);
364        }
365      });
366      return future;
367    }, "ConnectionFactory.createAsyncConnection");
368  }
369}