001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Random;
024import java.util.Set;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Consistency;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/** Creates multiple threads that read and verify previously written data */
041@InterfaceAudience.Private
042public class MultiThreadedReader extends MultiThreadedAction {
043  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
044
045  protected Set<HBaseReaderThread> readers = new HashSet<>();
046  private final double verifyPercent;
047  protected volatile boolean aborted;
048
049  protected MultiThreadedWriterBase writer = null;
050
051  /**
052   * The number of keys verified in a sequence. This will never be larger than the total number of
053   * keys in the range. The reader might also verify random keys when it catches up with the writer.
054   */
055  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
056
057  /**
058   * Default maximum number of read errors to tolerate before shutting down all readers.
059   */
060  public static final int DEFAULT_MAX_ERRORS = 10;
061
062  /**
063   * Default "window" size between the last key written by the writer and the key that we attempt to
064   * read. The lower this number, the stricter our testing is. If this is zero, we always attempt to
065   * read the highest key in the contiguous sequence of keys written by the writers.
066   */
067  public static final int DEFAULT_KEY_WINDOW = 0;
068
069  /**
070   * Default batch size for multigets
071   */
072  public static final int DEFAULT_BATCH_SIZE = 1; // translates to simple GET (no multi GET)
073
074  protected AtomicLong numKeysVerified = new AtomicLong(0);
075  protected AtomicLong numReadErrors = new AtomicLong(0);
076  protected AtomicLong numReadFailures = new AtomicLong(0);
077  protected AtomicLong nullResult = new AtomicLong(0);
078  private int maxErrors = DEFAULT_MAX_ERRORS;
079  private int keyWindow = DEFAULT_KEY_WINDOW;
080  private int batchSize = DEFAULT_BATCH_SIZE;
081  private int regionReplicaId = -1; // particular region replica id to do reads against if set
082
083  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
084    double verifyPercent) throws IOException {
085    super(dataGen, conf, tableName, "R");
086    this.verifyPercent = verifyPercent;
087  }
088
089  public void linkToWriter(MultiThreadedWriterBase writer) {
090    this.writer = writer;
091    writer.setTrackWroteKeys(true);
092  }
093
094  public void setMaxErrors(int maxErrors) {
095    this.maxErrors = maxErrors;
096  }
097
098  public void setKeyWindow(int keyWindow) {
099    this.keyWindow = keyWindow;
100  }
101
102  public void setMultiGetBatchSize(int batchSize) {
103    this.batchSize = batchSize;
104  }
105
106  public void setRegionReplicaId(int regionReplicaId) {
107    this.regionReplicaId = regionReplicaId;
108  }
109
110  @Override
111  public void start(long startKey, long endKey, int numThreads) throws IOException {
112    super.start(startKey, endKey, numThreads);
113    if (verbose) {
114      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
115    }
116
117    addReaderThreads(numThreads);
118    startThreads(readers);
119  }
120
121  protected void addReaderThreads(int numThreads) throws IOException {
122    for (int i = 0; i < numThreads; ++i) {
123      HBaseReaderThread reader = createReaderThread(i);
124      readers.add(reader);
125    }
126  }
127
128  protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
129    HBaseReaderThread reader = new HBaseReaderThread(readerId);
130    Threads.setLoggingUncaughtExceptionHandler(reader);
131    return reader;
132  }
133
134  public class HBaseReaderThread extends Thread {
135    protected final int readerId;
136    protected final Table table;
137
138    /** The "current" key being read. Increases from startKey to endKey. */
139    private long curKey;
140
141    /** Time when the thread started */
142    protected long startTimeMs;
143
144    /** If we are ahead of the writer and reading a random key. */
145    private boolean readingRandomKey;
146
147    private boolean printExceptionTrace = true;
148
149    /**
150     * @param readerId only the keys with this remainder from division by {@link #numThreads} will
151     *                 be read by this thread
152     */
153    public HBaseReaderThread(int readerId) throws IOException {
154      this.readerId = readerId;
155      table = createTable();
156      setName(getClass().getSimpleName() + "_" + readerId);
157    }
158
159    protected Table createTable() throws IOException {
160      return connection.getTable(tableName);
161    }
162
163    @Override
164    public void run() {
165      try {
166        runReader();
167      } finally {
168        closeTable();
169        numThreadsWorking.decrementAndGet();
170      }
171    }
172
173    protected void closeTable() {
174      try {
175        if (table != null) {
176          table.close();
177        }
178      } catch (IOException e) {
179        LOG.error("Error closing table", e);
180      }
181    }
182
183    private void runReader() {
184      if (verbose) {
185        LOG.info("Started thread #" + readerId + " for reads...");
186      }
187
188      startTimeMs = EnvironmentEdgeManager.currentTime();
189      curKey = startKey;
190      long[] keysForThisReader = new long[batchSize];
191      while (curKey < endKey && !aborted) {
192        int readingRandomKeyStartIndex = -1;
193        int numKeys = 0;
194        // if multiGet, loop until we have the number of keys equal to the batch size
195        do {
196          long k = getNextKeyToRead();
197          if (k < startKey || k >= endKey) {
198            numReadErrors.incrementAndGet();
199            throw new AssertionError("Load tester logic error: proposed key " + "to read " + k
200              + " is out of range (startKey=" + startKey + ", endKey=" + endKey + ")");
201          }
202          if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
203            // Skip keys that this thread should not read, as well as the keys
204            // that we know the writer failed to write.
205            continue;
206          }
207          keysForThisReader[numKeys] = k;
208          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
209            // store the first index of a random read
210            readingRandomKeyStartIndex = numKeys;
211          }
212          numKeys++;
213        } while (numKeys < batchSize && curKey < endKey && !aborted);
214
215        if (numKeys > 0) { // meaning there is some key to read
216          readKey(keysForThisReader);
217          // We have verified some unique key(s).
218          numUniqueKeysVerified
219            .getAndAdd(readingRandomKeyStartIndex == -1 ? numKeys : readingRandomKeyStartIndex);
220        }
221      }
222    }
223
224    /**
225     * Should only be used for the concurrent writer/reader workload. The maximum key we are allowed
226     * to read, subject to the "key window" constraint.
227     */
228    private long maxKeyWeCanRead() {
229      long insertedUpToKey = writer.wroteUpToKey();
230      if (insertedUpToKey >= endKey - 1) {
231        // The writer has finished writing our range, so we can read any
232        // key in the range.
233        return endKey - 1;
234      }
235      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
236    }
237
238    protected long getNextKeyToRead() {
239      readingRandomKey = false;
240      if (writer == null || curKey <= maxKeyWeCanRead()) {
241        return curKey++;
242      }
243
244      // We caught up with the writer. See if we can read any keys at all.
245      long maxKeyToRead;
246      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
247        // The writer has not written sufficient keys for us to be able to read
248        // anything at all. Sleep a bit. This should only happen in the
249        // beginning of a load test run.
250        Threads.sleepWithoutInterrupt(50);
251      }
252
253      if (curKey <= maxKeyToRead) {
254        // The writer wrote some keys, and we are now allowed to read our
255        // current key.
256        return curKey++;
257      }
258
259      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
260      // Don't increment the current key -- we still have to try reading it
261      // later. Set a flag to make sure that we don't count this key towards
262      // the set of unique keys we have verified.
263      readingRandomKey = true;
264      return startKey
265        + Math.abs(ThreadLocalRandom.current().nextLong()) % (maxKeyToRead - startKey + 1);
266    }
267
268    private Get[] readKey(long[] keysToRead) {
269      Random rand = ThreadLocalRandom.current();
270      Get[] gets = new Get[keysToRead.length];
271      int i = 0;
272      for (long keyToRead : keysToRead) {
273        try {
274          gets[i] = createGet(keyToRead);
275          if (keysToRead.length == 1) {
276            queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead);
277          }
278          i++;
279        } catch (IOException e) {
280          numReadFailures.addAndGet(1);
281          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
282            + ", time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
283          if (printExceptionTrace) {
284            LOG.warn(e.toString(), e);
285            printExceptionTrace = false;
286          }
287        }
288      }
289      if (keysToRead.length > 1) {
290        try {
291          queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead);
292        } catch (IOException e) {
293          numReadFailures.addAndGet(gets.length);
294          for (long keyToRead : keysToRead) {
295            LOG.debug(
296              "[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: "
297                + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
298          }
299          if (printExceptionTrace) {
300            LOG.warn(e.toString(), e);
301            printExceptionTrace = false;
302          }
303        }
304      }
305      return gets;
306    }
307
308    protected Get createGet(long keyToRead) throws IOException {
309      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
310      StringBuilder cfsString = new StringBuilder();
311      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
312      for (byte[] cf : columnFamilies) {
313        get.addFamily(cf);
314        if (verbose) {
315          if (cfsString.length() > 0) {
316            cfsString.append(", ");
317          }
318          cfsString.append("[").append(Bytes.toStringBinary(cf)).append("]");
319        }
320      }
321      get = dataGenerator.beforeGet(keyToRead, get);
322      if (regionReplicaId > 0) {
323        get.setReplicaId(regionReplicaId);
324        get.setConsistency(Consistency.TIMELINE);
325      }
326      if (verbose) {
327        LOG.info("[{}] Querying key {}, cfs {}", readerId, keyToRead, cfsString.toString());
328      }
329      return get;
330    }
331
332    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
333      // read the data
334      long start = System.nanoTime();
335      // Uses multi/batch gets
336      Result[] results = table.get(Arrays.asList(gets));
337      long end = System.nanoTime();
338      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
339    }
340
341    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
342      // read the data
343
344      long start = System.nanoTime();
345      // Uses simple get
346      Result result = table.get(get);
347      long end = System.nanoTime();
348      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
349    }
350
351    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
352      Result[] results, Table table, boolean isNullExpected) throws IOException {
353      totalOpTimeMs.addAndGet(elapsedNano / 1000000);
354      numKeys.addAndGet(gets.length);
355      int i = 0;
356      for (Result result : results) {
357        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
358          isNullExpected);
359      }
360    }
361
362    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
363      Result result, Table table, boolean isNullExpected) throws IOException {
364      verifyResultsAndUpdateMetrics(verify, new Get[] { get }, elapsedNano, new Result[] { result },
365        table, isNullExpected);
366    }
367
368    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result,
369      Table table, boolean isNullExpected) throws IOException {
370      if (!result.isEmpty()) {
371        if (verify) {
372          numKeysVerified.incrementAndGet();
373        }
374      } else {
375        HRegionLocation hloc;
376        try (RegionLocator locator = connection.getRegionLocator(tableName)) {
377          hloc = locator.getRegionLocation(get.getRow());
378        }
379        String rowKey = Bytes.toString(get.getRow());
380        LOG.info("Key = " + rowKey + ", Region location: " + hloc);
381        if (isNullExpected) {
382          nullResult.incrementAndGet();
383          LOG.debug("Null result obtained for the key =" + rowKey);
384          return;
385        }
386      }
387      boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
388      long numErrorsAfterThis = 0;
389      if (isOk) {
390        long cols = 0;
391        // Count the columns for reporting purposes.
392        for (byte[] cf : result.getMap().keySet()) {
393          cols += result.getFamilyMap(cf).size();
394        }
395        numCols.addAndGet(cols);
396      } else {
397        if (writer != null) {
398          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
399        }
400        numErrorsAfterThis = numReadErrors.incrementAndGet();
401      }
402
403      if (numErrorsAfterThis > maxErrors) {
404        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
405        aborted = true;
406      }
407    }
408  }
409
410  public long getNumReadFailures() {
411    return numReadFailures.get();
412  }
413
414  public long getNumReadErrors() {
415    return numReadErrors.get();
416  }
417
418  public long getNumKeysVerified() {
419    return numKeysVerified.get();
420  }
421
422  public long getNumUniqueKeysVerified() {
423    return numUniqueKeysVerified.get();
424  }
425
426  public long getNullResultsCount() {
427    return nullResult.get();
428  }
429
430  @Override
431  protected String progressInfo() {
432    StringBuilder sb = new StringBuilder();
433    appendToStatus(sb, "verified", numKeysVerified.get());
434    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
435    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
436    appendToStatus(sb, "NULL RESULT", nullResult.get());
437    return sb.toString();
438  }
439}