001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY;
043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
053import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
054import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
055
056import java.util.concurrent.TimeUnit;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Timeout configs.
065 */
066@InterfaceAudience.Private
067class AsyncConnectionConfiguration {
068
069  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
070
071  /**
072   * Parameter name for client pause when server is overloaded, denoted by
073   * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()}
074   */
075  public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED =
076    "hbase.client.pause.server.overloaded";
077
078  static {
079    // This is added where the configs are referenced. It may be too late to happen before
080    // any user _sets_ the old cqtbe config onto a Configuration option. So we still need
081    // to handle checking both properties in parsing below. The benefit of calling this is
082    // that it should still cause Configuration to log a warning if we do end up falling
083    // through to the old deprecated config.
084    Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
085      HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
086  }
087
088  /**
089   * Configure the number of failures after which the client will start logging. A few failures is
090   * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
091   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this
092   * stage.
093   */
094  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
095    "hbase.client.start.log.errors.counter";
096  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
097
098  private final long metaOperationTimeoutNs;
099
100  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
101  // by this value, see scanTimeoutNs.
102  private final long operationTimeoutNs;
103
104  // timeout for each rpc request. Can be overridden by a more specific config, such as
105  // readRpcTimeout or writeRpcTimeout.
106  private final long rpcTimeoutNs;
107
108  // timeout for each read rpc request
109  private final long readRpcTimeoutNs;
110
111  // timeout for each read rpc request against system tables
112  private final long metaReadRpcTimeoutNs;
113
114  // timeout for each write rpc request
115  private final long writeRpcTimeoutNs;
116
117  private final long pauseNs;
118
119  private final long pauseNsForServerOverloaded;
120
121  private final int maxRetries;
122
123  /** How many retries are allowed before we start to log */
124  private final int startLogErrorsCnt;
125
126  // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
127  // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
128  // client that it is still alive. The scan timeout is used as operation timeout for every
129  // operations in a scan, such as openScanner or next.
130  private final long scanTimeoutNs;
131  private final long metaScanTimeoutNs;
132
133  private final int scannerCaching;
134
135  private final int metaScannerCaching;
136
137  private final long scannerMaxResultSize;
138
139  private final long writeBufferSize;
140
141  private final long writeBufferPeriodicFlushTimeoutNs;
142
143  // this is for supporting region replica get, if the primary does not finished within this
144  // timeout, we will send request to secondaries.
145  private final long primaryCallTimeoutNs;
146
147  private final long primaryScanTimeoutNs;
148
149  private final long primaryMetaScanTimeoutNs;
150
151  private final int maxKeyValueSize;
152
153  private final int bufferedMutatorMaxMutations;
154
155  AsyncConnectionConfiguration(Configuration conf) {
156    long operationTimeoutMs =
157      conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
158    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(operationTimeoutMs);
159    this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS
160      .toNanos(conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeoutMs));
161    long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
162    this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
163    long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs);
164    this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
165    this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
166      .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
167    this.writeRpcTimeoutNs =
168      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
169    long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
170    long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
171      conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
172    if (pauseMsForServerOverloaded < pauseMs) {
173      LOG.warn(
174        "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
175        HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE,
176        pauseMs);
177      pauseMsForServerOverloaded = pauseMs;
178    }
179    this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
180    this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded);
181    this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
182    this.startLogErrorsCnt =
183      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
184    long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
185      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
186    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
187    this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
188      .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));
189    this.scannerCaching =
190      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
191    this.metaScannerCaching =
192      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
193    this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
194      DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
195    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
196    this.writeBufferPeriodicFlushTimeoutNs =
197      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
198        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
199    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
200      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
201    this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
202      conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
203    this.primaryMetaScanTimeoutNs =
204      TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
205        HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
206    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
207    this.bufferedMutatorMaxMutations = conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY,
208      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT));
209  }
210
211  long getMetaOperationTimeoutNs() {
212    return metaOperationTimeoutNs;
213  }
214
215  long getOperationTimeoutNs() {
216    return operationTimeoutNs;
217  }
218
219  long getRpcTimeoutNs() {
220    return rpcTimeoutNs;
221  }
222
223  long getReadRpcTimeoutNs() {
224    return readRpcTimeoutNs;
225  }
226
227  long getMetaReadRpcTimeoutNs() {
228    return metaReadRpcTimeoutNs;
229  }
230
231  long getWriteRpcTimeoutNs() {
232    return writeRpcTimeoutNs;
233  }
234
235  long getPauseNs() {
236    return pauseNs;
237  }
238
239  long getPauseNsForServerOverloaded() {
240    return pauseNsForServerOverloaded;
241  }
242
243  int getMaxRetries() {
244    return maxRetries;
245  }
246
247  int getStartLogErrorsCnt() {
248    return startLogErrorsCnt;
249  }
250
251  long getScanTimeoutNs() {
252    return scanTimeoutNs;
253  }
254
255  long getMetaScanTimeoutNs() {
256    return metaScanTimeoutNs;
257  }
258
259  int getScannerCaching() {
260    return scannerCaching;
261  }
262
263  int getMetaScannerCaching() {
264    return metaScannerCaching;
265  }
266
267  long getScannerMaxResultSize() {
268    return scannerMaxResultSize;
269  }
270
271  long getWriteBufferSize() {
272    return writeBufferSize;
273  }
274
275  long getWriteBufferPeriodicFlushTimeoutNs() {
276    return writeBufferPeriodicFlushTimeoutNs;
277  }
278
279  long getPrimaryCallTimeoutNs() {
280    return primaryCallTimeoutNs;
281  }
282
283  long getPrimaryScanTimeoutNs() {
284    return primaryScanTimeoutNs;
285  }
286
287  long getPrimaryMetaScanTimeoutNs() {
288    return primaryMetaScanTimeoutNs;
289  }
290
291  int getMaxKeyValueSize() {
292    return maxKeyValueSize;
293  }
294
295  int getBufferedMutatorMaxMutations() {
296    return bufferedMutatorMaxMutations;
297  }
298}