001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; 023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; 024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; 025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; 027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; 028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; 029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT; 030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; 031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; 032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; 033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; 034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; 035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; 036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; 038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; 039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; 040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; 041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; 044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY; 045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; 046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; 047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; 048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT; 049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; 050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; 051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; 052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; 053 054import java.util.concurrent.TimeUnit; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Timeout configs. 063 */ 064@InterfaceAudience.Private 065class AsyncConnectionConfiguration { 066 067 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); 068 069 /** 070 * Parameter name for client pause when server is overloaded, denoted by 071 * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()} 072 */ 073 public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = 074 "hbase.client.pause.server.overloaded"; 075 076 static { 077 // This is added where the configs are referenced. It may be too late to happen before 078 // any user _sets_ the old cqtbe config onto a Configuration option. So we still need 079 // to handle checking both properties in parsing below. The benefit of calling this is 080 // that it should still cause Configuration to log a warning if we do end up falling 081 // through to the old deprecated config. 082 Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, 083 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); 084 } 085 086 /** 087 * Configure the number of failures after which the client will start logging. A few failures is 088 * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 089 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this 090 * stage. 091 */ 092 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 093 "hbase.client.start.log.errors.counter"; 094 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 095 096 private final long metaOperationTimeoutNs; 097 098 // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected 099 // by this value, see scanTimeoutNs. 100 private final long operationTimeoutNs; 101 102 // timeout for each rpc request. Can be overridden by a more specific config, such as 103 // readRpcTimeout or writeRpcTimeout. 104 private final long rpcTimeoutNs; 105 106 // timeout for each read rpc request 107 private final long readRpcTimeoutNs; 108 109 // timeout for each read rpc request against system tables 110 private final long metaReadRpcTimeoutNs; 111 112 // timeout for each write rpc request 113 private final long writeRpcTimeoutNs; 114 115 private final long pauseNs; 116 117 private final long pauseNsForServerOverloaded; 118 119 private final int maxRetries; 120 121 /** How many retries are allowed before we start to log */ 122 private final int startLogErrorsCnt; 123 124 // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is 125 // crash. The RS will always return something before the rpc timeout or scan timeout to tell the 126 // client that it is still alive. The scan timeout is used as operation timeout for every 127 // operations in a scan, such as openScanner or next. 128 private final long scanTimeoutNs; 129 private final long metaScanTimeoutNs; 130 131 private final int scannerCaching; 132 133 private final int metaScannerCaching; 134 135 private final long scannerMaxResultSize; 136 137 private final long writeBufferSize; 138 139 private final long writeBufferPeriodicFlushTimeoutNs; 140 141 // this is for supporting region replica get, if the primary does not finished within this 142 // timeout, we will send request to secondaries. 143 private final long primaryCallTimeoutNs; 144 145 private final long primaryScanTimeoutNs; 146 147 private final long primaryMetaScanTimeoutNs; 148 149 private final int maxKeyValueSize; 150 151 AsyncConnectionConfiguration(Configuration conf) { 152 long operationTimeoutMs = 153 conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 154 this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(operationTimeoutMs); 155 this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS 156 .toNanos(conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeoutMs)); 157 long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); 158 this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs); 159 long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs); 160 this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis); 161 this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS 162 .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis)); 163 this.writeRpcTimeoutNs = 164 TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); 165 long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); 166 long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 167 conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); 168 if (pauseMsForServerOverloaded < pauseMs) { 169 LOG.warn( 170 "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", 171 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE, 172 pauseMs); 173 pauseMsForServerOverloaded = pauseMs; 174 } 175 this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); 176 this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded); 177 this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 178 this.startLogErrorsCnt = 179 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 180 long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 181 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 182 this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis); 183 this.metaScanTimeoutNs = TimeUnit.MILLISECONDS 184 .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis)); 185 this.scannerCaching = 186 conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 187 this.metaScannerCaching = 188 conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); 189 this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 190 DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 191 this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); 192 this.writeBufferPeriodicFlushTimeoutNs = 193 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, 194 WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); 195 this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 196 conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); 197 this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 198 conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT)); 199 this.primaryMetaScanTimeoutNs = 200 TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 201 HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); 202 this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); 203 } 204 205 long getMetaOperationTimeoutNs() { 206 return metaOperationTimeoutNs; 207 } 208 209 long getOperationTimeoutNs() { 210 return operationTimeoutNs; 211 } 212 213 long getRpcTimeoutNs() { 214 return rpcTimeoutNs; 215 } 216 217 long getReadRpcTimeoutNs() { 218 return readRpcTimeoutNs; 219 } 220 221 long getMetaReadRpcTimeoutNs() { 222 return metaReadRpcTimeoutNs; 223 } 224 225 long getWriteRpcTimeoutNs() { 226 return writeRpcTimeoutNs; 227 } 228 229 long getPauseNs() { 230 return pauseNs; 231 } 232 233 long getPauseNsForServerOverloaded() { 234 return pauseNsForServerOverloaded; 235 } 236 237 int getMaxRetries() { 238 return maxRetries; 239 } 240 241 int getStartLogErrorsCnt() { 242 return startLogErrorsCnt; 243 } 244 245 long getScanTimeoutNs() { 246 return scanTimeoutNs; 247 } 248 249 long getMetaScanTimeoutNs() { 250 return metaScanTimeoutNs; 251 } 252 253 int getScannerCaching() { 254 return scannerCaching; 255 } 256 257 int getMetaScannerCaching() { 258 return metaScannerCaching; 259 } 260 261 long getScannerMaxResultSize() { 262 return scannerMaxResultSize; 263 } 264 265 long getWriteBufferSize() { 266 return writeBufferSize; 267 } 268 269 long getWriteBufferPeriodicFlushTimeoutNs() { 270 return writeBufferPeriodicFlushTimeoutNs; 271 } 272 273 long getPrimaryCallTimeoutNs() { 274 return primaryCallTimeoutNs; 275 } 276 277 long getPrimaryScanTimeoutNs() { 278 return primaryScanTimeoutNs; 279 } 280 281 long getPrimaryMetaScanTimeoutNs() { 282 return primaryMetaScanTimeoutNs; 283 } 284 285 int getMaxKeyValueSize() { 286 return maxKeyValueSize; 287 } 288}