001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.util.ReflectionUtils;
023import org.apache.yetus.audience.InterfaceAudience;
024
025/**
026 * Factory to create an {@link RpcRetryingCaller}
027 */
028@InterfaceAudience.Private
029public class RpcRetryingCallerFactory {
030
031  /** Configuration key for a custom {@link RpcRetryingCaller} */
032  public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
033  private final ConnectionConfiguration connectionConf;
034  private final RetryingCallerInterceptor interceptor;
035  private final int startLogErrorsCnt;
036  private final MetricsConnection metrics;
037
038  public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
039    this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
040  }
041
042  public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
043    RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
044    this.connectionConf = connectionConf;
045    startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
046      AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
047    this.interceptor = interceptor;
048    this.metrics = metrics;
049  }
050
051  /**
052   * Create a new RetryingCaller with specific rpc timeout.
053   */
054  public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
055    // We store the values in the factory instance. This way, constructing new objects
056    // is cheap as it does not require parsing a complex structure.
057    return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
058      connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
059      interceptor, startLogErrorsCnt, rpcTimeout, metrics);
060  }
061
062  /**
063   * Create a new RetryingCaller with configured rpc timeout.
064   */
065  public <T> RpcRetryingCaller<T> newCaller() {
066    // We store the values in the factory instance. This way, constructing new objects
067    // is cheap as it does not require parsing a complex structure.
068    return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
069      connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
070      interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
071  }
072
073  @RestrictedApi(explanation = "Should only be called on process initialization", link = "",
074      allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java")
075  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
076    MetricsConnection metrics) {
077    return instantiate(configuration, new ConnectionConfiguration(configuration), metrics);
078  }
079
080  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
081    ConnectionConfiguration connectionConf, MetricsConnection metrics) {
082    return instantiate(configuration, connectionConf,
083      RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
084  }
085
086  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
087    ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
088    MetricsConnection metrics) {
089    return instantiate(configuration, connectionConf,
090      RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
091  }
092
093  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
094    ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
095    ServerStatisticTracker stats, MetricsConnection metrics) {
096    String clazzName = RpcRetryingCallerFactory.class.getName();
097    String rpcCallerFactoryClazz =
098      configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
099    RpcRetryingCallerFactory factory;
100    if (rpcCallerFactoryClazz.equals(clazzName)) {
101      factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics);
102    } else {
103      factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
104        new Class[] { Configuration.class, ConnectionConfiguration.class },
105        new Object[] { configuration, connectionConf });
106    }
107    return factory;
108  }
109}