001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Random; 024import java.util.Set; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Consistency; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** Creates multiple threads that read and verify previously written data */ 041@InterfaceAudience.Private 042public class MultiThreadedReader extends MultiThreadedAction { 043 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class); 044 045 protected Set<HBaseReaderThread> readers = new HashSet<>(); 046 private final double verifyPercent; 047 protected volatile boolean aborted; 048 049 protected MultiThreadedWriterBase writer = null; 050 051 /** 052 * The number of keys verified in a sequence. This will never be larger than the total number of 053 * keys in the range. The reader might also verify random keys when it catches up with the writer. 054 */ 055 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 056 057 /** 058 * Default maximum number of read errors to tolerate before shutting down all readers. 059 */ 060 public static final int DEFAULT_MAX_ERRORS = 10; 061 062 /** 063 * Default "window" size between the last key written by the writer and the key that we attempt to 064 * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to 065 * read the highest key in the contiguous sequence of keys written by the writers. 066 */ 067 public static final int DEFAULT_KEY_WINDOW = 0; 068 069 /** 070 * Default batch size for multigets 071 */ 072 public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET) 073 074 protected AtomicLong numKeysVerified = new AtomicLong(0); 075 protected AtomicLong numReadErrors = new AtomicLong(0); 076 protected AtomicLong numReadFailures = new AtomicLong(0); 077 protected AtomicLong nullResult = new AtomicLong(0); 078 private int maxErrors = DEFAULT_MAX_ERRORS; 079 private int keyWindow = DEFAULT_KEY_WINDOW; 080 private int batchSize = DEFAULT_BATCH_SIZE; 081 private int regionReplicaId = -1; // particular region replica id to do reads against if set 082 083 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, 084 double verifyPercent) throws IOException { 085 super(dataGen, conf, tableName, "R"); 086 this.verifyPercent = verifyPercent; 087 } 088 089 public void linkToWriter(MultiThreadedWriterBase writer) { 090 this.writer = writer; 091 writer.setTrackWroteKeys(true); 092 } 093 094 public void setMaxErrors(int maxErrors) { 095 this.maxErrors = maxErrors; 096 } 097 098 public void setKeyWindow(int keyWindow) { 099 this.keyWindow = keyWindow; 100 } 101 102 public void setMultiGetBatchSize(int batchSize) { 103 this.batchSize = batchSize; 104 } 105 106 public void setRegionReplicaId(int regionReplicaId) { 107 this.regionReplicaId = regionReplicaId; 108 } 109 110 @Override 111 public void start(long startKey, long endKey, int numThreads) throws IOException { 112 super.start(startKey, endKey, numThreads); 113 if (verbose) { 114 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 115 } 116 117 addReaderThreads(numThreads); 118 startThreads(readers); 119 } 120 121 protected void addReaderThreads(int numThreads) throws IOException { 122 for (int i = 0; i < numThreads; ++i) { 123 HBaseReaderThread reader = createReaderThread(i); 124 readers.add(reader); 125 } 126 } 127 128 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 129 HBaseReaderThread reader = new HBaseReaderThread(readerId); 130 Threads.setLoggingUncaughtExceptionHandler(reader); 131 return reader; 132 } 133 134 public class HBaseReaderThread extends Thread { 135 protected final int readerId; 136 protected final Table table; 137 138 /** The "current" key being read. Increases from startKey to endKey. */ 139 private long curKey; 140 141 /** Time when the thread started */ 142 protected long startTimeMs; 143 144 /** If we are ahead of the writer and reading a random key. */ 145 private boolean readingRandomKey; 146 147 private boolean printExceptionTrace = true; 148 149 /** 150 * @param readerId only the keys with this remainder from division by {@link #numThreads} will 151 * be read by this thread 152 */ 153 public HBaseReaderThread(int readerId) throws IOException { 154 this.readerId = readerId; 155 table = createTable(); 156 setName(getClass().getSimpleName() + "_" + readerId); 157 } 158 159 protected Table createTable() throws IOException { 160 return connection.getTable(tableName); 161 } 162 163 @Override 164 public void run() { 165 try { 166 runReader(); 167 } finally { 168 closeTable(); 169 numThreadsWorking.decrementAndGet(); 170 } 171 } 172 173 protected void closeTable() { 174 try { 175 if (table != null) { 176 table.close(); 177 } 178 } catch (IOException e) { 179 LOG.error("Error closing table", e); 180 } 181 } 182 183 private void runReader() { 184 if (verbose) { 185 LOG.info("Started thread #" + readerId + " for reads..."); 186 } 187 188 startTimeMs = EnvironmentEdgeManager.currentTime(); 189 curKey = startKey; 190 long[] keysForThisReader = new long[batchSize]; 191 while (curKey < endKey && !aborted) { 192 int readingRandomKeyStartIndex = -1; 193 int numKeys = 0; 194 // if multiGet, loop until we have the number of keys equal to the batch size 195 do { 196 long k = getNextKeyToRead(); 197 if (k < startKey || k >= endKey) { 198 numReadErrors.incrementAndGet(); 199 throw new AssertionError("Load tester logic error: proposed key " + "to read " + k 200 + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")"); 201 } 202 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) { 203 // Skip keys that this thread should not read, as well as the keys 204 // that we know the writer failed to write. 205 continue; 206 } 207 keysForThisReader[numKeys] = k; 208 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 209 // store the first index of a random read 210 readingRandomKeyStartIndex = numKeys; 211 } 212 numKeys++; 213 } while (numKeys < batchSize && curKey < endKey && !aborted); 214 215 if (numKeys > 0) { // meaning there is some key to read 216 readKey(keysForThisReader); 217 // We have verified some unique key(s). 218 numUniqueKeysVerified 219 .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex); 220 } 221 } 222 } 223 224 /** 225 * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed 226 * to read, subject to the "key window" constraint. 227 */ 228 private long maxKeyWeCanRead() { 229 long insertedUpToKey = writer.wroteUpToKey(); 230 if (insertedUpToKey >= endKey - 1) { 231 // The writer has finished writing our range, so we can read any 232 // key in the range. 233 return endKey - 1; 234 } 235 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 236 } 237 238 protected long getNextKeyToRead() { 239 readingRandomKey = false; 240 if (writer == null || curKey <= maxKeyWeCanRead()) { 241 return curKey++; 242 } 243 244 // We caught up with the writer. See if we can read any keys at all. 245 long maxKeyToRead; 246 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 247 // The writer has not written sufficient keys for us to be able to read 248 // anything at all. Sleep a bit. This should only happen in the 249 // beginning of a load test run. 250 Threads.sleepWithoutInterrupt(50); 251 } 252 253 if (curKey <= maxKeyToRead) { 254 // The writer wrote some keys, and we are now allowed to read our 255 // current key. 256 return curKey++; 257 } 258 259 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 260 // Don't increment the current key -- we still have to try reading it 261 // later. Set a flag to make sure that we don't count this key towards 262 // the set of unique keys we have verified. 263 readingRandomKey = true; 264 return startKey 265 + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1); 266 } 267 268 private Get[] readKey(long[] keysToRead) { 269 Random rand = ThreadLocalRandom.current(); 270 Get[] gets = new Get[keysToRead.length]; 271 int i = 0; 272 for (long keyToRead : keysToRead) { 273 try { 274 gets[i] = createGet(keyToRead); 275 if (keysToRead.length == 1) { 276 queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead); 277 } 278 i++; 279 } catch (IOException e) { 280 numReadFailures.addAndGet(1); 281 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 282 + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 283 if (printExceptionTrace) { 284 LOG.warn(e.toString(), e); 285 printExceptionTrace = false; 286 } 287 } 288 } 289 if (keysToRead.length > 1) { 290 try { 291 queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead); 292 } catch (IOException e) { 293 numReadFailures.addAndGet(gets.length); 294 for (long keyToRead : keysToRead) { 295 LOG.debug( 296 "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: " 297 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 298 } 299 if (printExceptionTrace) { 300 LOG.warn(e.toString(), e); 301 printExceptionTrace = false; 302 } 303 } 304 } 305 return gets; 306 } 307 308 protected Get createGet(long keyToRead) throws IOException { 309 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 310 StringBuilder cfsString = new StringBuilder(); 311 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 312 for (byte[] cf : columnFamilies) { 313 get.addFamily(cf); 314 if (verbose) { 315 if (cfsString.length() > 0) { 316 cfsString.append(", "); 317 } 318 cfsString.append("[").append(Bytes.toStringBinary(cf)).append("]"); 319 } 320 } 321 get = dataGenerator.beforeGet(keyToRead, get); 322 if (regionReplicaId > 0) { 323 get.setReplicaId(regionReplicaId); 324 get.setConsistency(Consistency.TIMELINE); 325 } 326 if (verbose) { 327 LOG.info("[{}] Querying key {}, cfs {}", readerId, keyToRead, cfsString.toString()); 328 } 329 return get; 330 } 331 332 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 333 // read the data 334 long start = System.nanoTime(); 335 // Uses multi/batch gets 336 Result[] results = table.get(Arrays.asList(gets)); 337 long end = System.nanoTime(); 338 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 339 } 340 341 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 342 // read the data 343 344 long start = System.nanoTime(); 345 // Uses simple get 346 Result result = table.get(get); 347 long end = System.nanoTime(); 348 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 349 } 350 351 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 352 Result[] results, Table table, boolean isNullExpected) throws IOException { 353 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 354 numKeys.addAndGet(gets.length); 355 int i = 0; 356 for (Result result : results) { 357 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 358 isNullExpected); 359 } 360 } 361 362 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 363 Result result, Table table, boolean isNullExpected) throws IOException { 364 verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result }, 365 table, isNullExpected); 366 } 367 368 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, 369 Table table, boolean isNullExpected) throws IOException { 370 if (!result.isEmpty()) { 371 if (verify) { 372 numKeysVerified.incrementAndGet(); 373 } 374 } else { 375 HRegionLocation hloc; 376 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 377 hloc = locator.getRegionLocation(get.getRow()); 378 } 379 String rowKey = Bytes.toString(get.getRow()); 380 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 381 if (isNullExpected) { 382 nullResult.incrementAndGet(); 383 LOG.debug("Null result obtained for the key =" + rowKey); 384 return; 385 } 386 } 387 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 388 long numErrorsAfterThis = 0; 389 if (isOk) { 390 long cols = 0; 391 // Count the columns for reporting purposes. 392 for (byte[] cf : result.getMap().keySet()) { 393 cols += result.getFamilyMap(cf).size(); 394 } 395 numCols.addAndGet(cols); 396 } else { 397 if (writer != null) { 398 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 399 } 400 numErrorsAfterThis = numReadErrors.incrementAndGet(); 401 } 402 403 if (numErrorsAfterThis > maxErrors) { 404 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 405 aborted = true; 406 } 407 } 408 } 409 410 public long getNumReadFailures() { 411 return numReadFailures.get(); 412 } 413 414 public long getNumReadErrors() { 415 return numReadErrors.get(); 416 } 417 418 public long getNumKeysVerified() { 419 return numKeysVerified.get(); 420 } 421 422 public long getNumUniqueKeysVerified() { 423 return numUniqueKeysVerified.get(); 424 } 425 426 public long getNullResultsCount() { 427 return nullResult.get(); 428 } 429 430 @Override 431 protected String progressInfo() { 432 StringBuilder sb = new StringBuilder(); 433 appendToStatus(sb, "verified", numKeysVerified.get()); 434 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 435 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 436 appendToStatus(sb, "NULL RESULT", nullResult.get()); 437 return sb.toString(); 438 } 439}