001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
053
054import java.util.concurrent.TimeUnit;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Timeout configs.
063 */
064@InterfaceAudience.Private
065class AsyncConnectionConfiguration {
066
067  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
068
069  /**
070   * Parameter name for client pause when server is overloaded, denoted by
071   * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()}
072   */
073  public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED =
074    "hbase.client.pause.server.overloaded";
075
076  static {
077    // This is added where the configs are referenced. It may be too late to happen before
078    // any user _sets_ the old cqtbe config onto a Configuration option. So we still need
079    // to handle checking both properties in parsing below. The benefit of calling this is
080    // that it should still cause Configuration to log a warning if we do end up falling
081    // through to the old deprecated config.
082    Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
083      HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
084  }
085
086  /**
087   * Configure the number of failures after which the client will start logging. A few failures is
088   * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
089   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this
090   * stage.
091   */
092  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
093    "hbase.client.start.log.errors.counter";
094  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
095
096  private final long metaOperationTimeoutNs;
097
098  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
099  // by this value, see scanTimeoutNs.
100  private final long operationTimeoutNs;
101
102  // timeout for each rpc request. Can be overridden by a more specific config, such as
103  // readRpcTimeout or writeRpcTimeout.
104  private final long rpcTimeoutNs;
105
106  // timeout for each read rpc request
107  private final long readRpcTimeoutNs;
108
109  // timeout for each read rpc request against system tables
110  private final long metaReadRpcTimeoutNs;
111
112  // timeout for each write rpc request
113  private final long writeRpcTimeoutNs;
114
115  private final long pauseNs;
116
117  private final long pauseNsForServerOverloaded;
118
119  private final int maxRetries;
120
121  /** How many retries are allowed before we start to log */
122  private final int startLogErrorsCnt;
123
124  // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
125  // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
126  // client that it is still alive. The scan timeout is used as operation timeout for every
127  // operations in a scan, such as openScanner or next.
128  private final long scanTimeoutNs;
129  private final long metaScanTimeoutNs;
130
131  private final int scannerCaching;
132
133  private final int metaScannerCaching;
134
135  private final long scannerMaxResultSize;
136
137  private final long writeBufferSize;
138
139  private final long writeBufferPeriodicFlushTimeoutNs;
140
141  // this is for supporting region replica get, if the primary does not finished within this
142  // timeout, we will send request to secondaries.
143  private final long primaryCallTimeoutNs;
144
145  private final long primaryScanTimeoutNs;
146
147  private final long primaryMetaScanTimeoutNs;
148
149  private final int maxKeyValueSize;
150
151  AsyncConnectionConfiguration(Configuration conf) {
152    long operationTimeoutMs =
153      conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
154    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(operationTimeoutMs);
155    this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS
156      .toNanos(conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeoutMs));
157    long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
158    this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
159    long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs);
160    this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
161    this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
162      .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
163    this.writeRpcTimeoutNs =
164      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
165    long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
166    long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
167      conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
168    if (pauseMsForServerOverloaded < pauseMs) {
169      LOG.warn(
170        "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
171        HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE,
172        pauseMs);
173      pauseMsForServerOverloaded = pauseMs;
174    }
175    this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
176    this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded);
177    this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
178    this.startLogErrorsCnt =
179      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
180    long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
181      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
182    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
183    this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
184      .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));
185    this.scannerCaching =
186      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
187    this.metaScannerCaching =
188      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
189    this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
190      DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
191    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
192    this.writeBufferPeriodicFlushTimeoutNs =
193      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
194        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
195    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
196      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
197    this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
198      conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
199    this.primaryMetaScanTimeoutNs =
200      TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
201        HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
202    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
203  }
204
205  long getMetaOperationTimeoutNs() {
206    return metaOperationTimeoutNs;
207  }
208
209  long getOperationTimeoutNs() {
210    return operationTimeoutNs;
211  }
212
213  long getRpcTimeoutNs() {
214    return rpcTimeoutNs;
215  }
216
217  long getReadRpcTimeoutNs() {
218    return readRpcTimeoutNs;
219  }
220
221  long getMetaReadRpcTimeoutNs() {
222    return metaReadRpcTimeoutNs;
223  }
224
225  long getWriteRpcTimeoutNs() {
226    return writeRpcTimeoutNs;
227  }
228
229  long getPauseNs() {
230    return pauseNs;
231  }
232
233  long getPauseNsForServerOverloaded() {
234    return pauseNsForServerOverloaded;
235  }
236
237  int getMaxRetries() {
238    return maxRetries;
239  }
240
241  int getStartLogErrorsCnt() {
242    return startLogErrorsCnt;
243  }
244
245  long getScanTimeoutNs() {
246    return scanTimeoutNs;
247  }
248
249  long getMetaScanTimeoutNs() {
250    return metaScanTimeoutNs;
251  }
252
253  int getScannerCaching() {
254    return scannerCaching;
255  }
256
257  int getMetaScannerCaching() {
258    return metaScannerCaching;
259  }
260
261  long getScannerMaxResultSize() {
262    return scannerMaxResultSize;
263  }
264
265  long getWriteBufferSize() {
266    return writeBufferSize;
267  }
268
269  long getWriteBufferPeriodicFlushTimeoutNs() {
270    return writeBufferPeriodicFlushTimeoutNs;
271  }
272
273  long getPrimaryCallTimeoutNs() {
274    return primaryCallTimeoutNs;
275  }
276
277  long getPrimaryScanTimeoutNs() {
278    return primaryScanTimeoutNs;
279  }
280
281  long getPrimaryMetaScanTimeoutNs() {
282    return primaryMetaScanTimeoutNs;
283  }
284
285  int getMaxKeyValueSize() {
286    return maxKeyValueSize;
287  }
288}