001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; 023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; 024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; 025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; 027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; 028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; 029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT; 030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; 031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; 032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; 033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; 034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; 035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; 036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; 038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; 039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; 040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; 041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT; 042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY; 043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; 046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY; 047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; 048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; 049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; 050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT; 051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; 052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; 053import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; 054import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; 055 056import java.util.concurrent.TimeUnit; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Timeout configs. 065 */ 066@InterfaceAudience.Private 067class AsyncConnectionConfiguration { 068 069 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); 070 071 /** 072 * Parameter name for client pause when server is overloaded, denoted by 073 * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()} 074 */ 075 public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = 076 "hbase.client.pause.server.overloaded"; 077 078 static { 079 // This is added where the configs are referenced. It may be too late to happen before 080 // any user _sets_ the old cqtbe config onto a Configuration option. So we still need 081 // to handle checking both properties in parsing below. The benefit of calling this is 082 // that it should still cause Configuration to log a warning if we do end up falling 083 // through to the old deprecated config. 084 Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, 085 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); 086 } 087 088 /** 089 * Configure the number of failures after which the client will start logging. A few failures is 090 * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 091 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this 092 * stage. 093 */ 094 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 095 "hbase.client.start.log.errors.counter"; 096 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 097 098 private final long metaOperationTimeoutNs; 099 100 // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected 101 // by this value, see scanTimeoutNs. 102 private final long operationTimeoutNs; 103 104 // timeout for each rpc request. Can be overridden by a more specific config, such as 105 // readRpcTimeout or writeRpcTimeout. 106 private final long rpcTimeoutNs; 107 108 // timeout for each read rpc request 109 private final long readRpcTimeoutNs; 110 111 // timeout for each read rpc request against system tables 112 private final long metaReadRpcTimeoutNs; 113 114 // timeout for each write rpc request 115 private final long writeRpcTimeoutNs; 116 117 private final long pauseNs; 118 119 private final long pauseNsForServerOverloaded; 120 121 private final int maxRetries; 122 123 /** How many retries are allowed before we start to log */ 124 private final int startLogErrorsCnt; 125 126 // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is 127 // crash. The RS will always return something before the rpc timeout or scan timeout to tell the 128 // client that it is still alive. The scan timeout is used as operation timeout for every 129 // operations in a scan, such as openScanner or next. 130 private final long scanTimeoutNs; 131 private final long metaScanTimeoutNs; 132 133 private final int scannerCaching; 134 135 private final int metaScannerCaching; 136 137 private final long scannerMaxResultSize; 138 139 private final long writeBufferSize; 140 141 private final long writeBufferPeriodicFlushTimeoutNs; 142 143 // this is for supporting region replica get, if the primary does not finished within this 144 // timeout, we will send request to secondaries. 145 private final long primaryCallTimeoutNs; 146 147 private final long primaryScanTimeoutNs; 148 149 private final long primaryMetaScanTimeoutNs; 150 151 private final int maxKeyValueSize; 152 153 private final int bufferedMutatorMaxMutations; 154 155 AsyncConnectionConfiguration(Configuration conf) { 156 long operationTimeoutMs = 157 conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 158 this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(operationTimeoutMs); 159 this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS 160 .toNanos(conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeoutMs)); 161 long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); 162 this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs); 163 long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs); 164 this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis); 165 this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS 166 .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis)); 167 this.writeRpcTimeoutNs = 168 TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); 169 long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); 170 long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 171 conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); 172 if (pauseMsForServerOverloaded < pauseMs) { 173 LOG.warn( 174 "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", 175 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE, 176 pauseMs); 177 pauseMsForServerOverloaded = pauseMs; 178 } 179 this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); 180 this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded); 181 this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 182 this.startLogErrorsCnt = 183 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 184 long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 185 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 186 this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis); 187 this.metaScanTimeoutNs = TimeUnit.MILLISECONDS 188 .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis)); 189 this.scannerCaching = 190 conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 191 this.metaScannerCaching = 192 conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); 193 this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 194 DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 195 this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); 196 this.writeBufferPeriodicFlushTimeoutNs = 197 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, 198 WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); 199 this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 200 conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); 201 this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 202 conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT)); 203 this.primaryMetaScanTimeoutNs = 204 TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 205 HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); 206 this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); 207 this.bufferedMutatorMaxMutations = conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, 208 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT)); 209 } 210 211 long getMetaOperationTimeoutNs() { 212 return metaOperationTimeoutNs; 213 } 214 215 long getOperationTimeoutNs() { 216 return operationTimeoutNs; 217 } 218 219 long getRpcTimeoutNs() { 220 return rpcTimeoutNs; 221 } 222 223 long getReadRpcTimeoutNs() { 224 return readRpcTimeoutNs; 225 } 226 227 long getMetaReadRpcTimeoutNs() { 228 return metaReadRpcTimeoutNs; 229 } 230 231 long getWriteRpcTimeoutNs() { 232 return writeRpcTimeoutNs; 233 } 234 235 long getPauseNs() { 236 return pauseNs; 237 } 238 239 long getPauseNsForServerOverloaded() { 240 return pauseNsForServerOverloaded; 241 } 242 243 int getMaxRetries() { 244 return maxRetries; 245 } 246 247 int getStartLogErrorsCnt() { 248 return startLogErrorsCnt; 249 } 250 251 long getScanTimeoutNs() { 252 return scanTimeoutNs; 253 } 254 255 long getMetaScanTimeoutNs() { 256 return metaScanTimeoutNs; 257 } 258 259 int getScannerCaching() { 260 return scannerCaching; 261 } 262 263 int getMetaScannerCaching() { 264 return metaScannerCaching; 265 } 266 267 long getScannerMaxResultSize() { 268 return scannerMaxResultSize; 269 } 270 271 long getWriteBufferSize() { 272 return writeBufferSize; 273 } 274 275 long getWriteBufferPeriodicFlushTimeoutNs() { 276 return writeBufferPeriodicFlushTimeoutNs; 277 } 278 279 long getPrimaryCallTimeoutNs() { 280 return primaryCallTimeoutNs; 281 } 282 283 long getPrimaryScanTimeoutNs() { 284 return primaryScanTimeoutNs; 285 } 286 287 long getPrimaryMetaScanTimeoutNs() { 288 return primaryMetaScanTimeoutNs; 289 } 290 291 int getMaxKeyValueSize() { 292 return maxKeyValueSize; 293 } 294 295 int getBufferedMutatorMaxMutations() { 296 return bufferedMutatorMaxMutations; 297 } 298}