001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.util.ReflectionUtils; 023import org.apache.yetus.audience.InterfaceAudience; 024 025/** 026 * Factory to create an {@link RpcRetryingCaller} 027 */ 028@InterfaceAudience.Private 029public class RpcRetryingCallerFactory { 030 031 /** Configuration key for a custom {@link RpcRetryingCaller} */ 032 public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; 033 private final ConnectionConfiguration connectionConf; 034 private final RetryingCallerInterceptor interceptor; 035 private final int startLogErrorsCnt; 036 private final MetricsConnection metrics; 037 038 public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) { 039 this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); 040 } 041 042 public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf, 043 RetryingCallerInterceptor interceptor, MetricsConnection metrics) { 044 this.connectionConf = connectionConf; 045 startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, 046 AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 047 this.interceptor = interceptor; 048 this.metrics = metrics; 049 } 050 051 /** 052 * Create a new RetryingCaller with specific rpc timeout. 053 */ 054 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 055 // We store the values in the factory instance. This way, constructing new objects 056 // is cheap as it does not require parsing a complex structure. 057 return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), 058 connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), 059 interceptor, startLogErrorsCnt, rpcTimeout, metrics); 060 } 061 062 /** 063 * Create a new RetryingCaller with configured rpc timeout. 064 */ 065 public <T> RpcRetryingCaller<T> newCaller() { 066 // We store the values in the factory instance. This way, constructing new objects 067 // is cheap as it does not require parsing a complex structure. 068 return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), 069 connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), 070 interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); 071 } 072 073 @RestrictedApi(explanation = "Should only be called on process initialization", link = "", 074 allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java") 075 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 076 MetricsConnection metrics) { 077 return instantiate(configuration, new ConnectionConfiguration(configuration), metrics); 078 } 079 080 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 081 ConnectionConfiguration connectionConf, MetricsConnection metrics) { 082 return instantiate(configuration, connectionConf, 083 RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics); 084 } 085 086 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 087 ConnectionConfiguration connectionConf, ServerStatisticTracker stats, 088 MetricsConnection metrics) { 089 return instantiate(configuration, connectionConf, 090 RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics); 091 } 092 093 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 094 ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor, 095 ServerStatisticTracker stats, MetricsConnection metrics) { 096 String clazzName = RpcRetryingCallerFactory.class.getName(); 097 String rpcCallerFactoryClazz = 098 configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); 099 RpcRetryingCallerFactory factory; 100 if (rpcCallerFactoryClazz.equals(clazzName)) { 101 factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics); 102 } else { 103 factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, 104 new Class[] { Configuration.class, ConnectionConfiguration.class }, 105 new Object[] { configuration, connectionConf }); 106 } 107 return factory; 108 } 109}