001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.Callable;
030import org.apache.commons.lang3.exception.ExceptionUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HTestConst;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
046import org.apache.hadoop.hbase.client.AsyncConnection;
047import org.apache.hadoop.hbase.client.AsyncTable;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.filter.Filter;
058import org.apache.hadoop.hbase.filter.FilterBase;
059import org.apache.hadoop.hbase.ipc.HBaseRpcController;
060import org.apache.hadoop.hbase.ipc.RpcCall;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.Threads;
065import org.apache.hadoop.hbase.wal.WAL;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.mockito.Mockito;
074
075import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
076import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
077import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
078
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
081
082/**
083 * Here we test to make sure that scans return the expected Results when the server is sending the
084 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
085 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
086 * the client when the server has exceeded the time limit during the processing of the scan. When
087 * the time limit is reached, the server will return to the Client whatever Results it has
088 * accumulated (potentially empty).
089 */
090@Category(LargeTests.class)
091public class TestScannerHeartbeatMessages {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095    HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
096
097  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
098
099  private static AsyncConnection CONN;
100
101  /**
102   * Table configuration
103   */
104  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
105
106  private static int NUM_ROWS = 5;
107  private static byte[] ROW = Bytes.toBytes("testRow");
108  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
109
110  private static int NUM_FAMILIES = 4;
111  private static byte[] FAMILY = Bytes.toBytes("testFamily");
112  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
113
114  private static int NUM_QUALIFIERS = 3;
115  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
116  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
117
118  private static int VALUE_SIZE = 128;
119  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
120
121  // The time limit should be based on the rpc timeout at client, or the client will regards
122  // the request as timeout before server return a heartbeat.
123  private static int SERVER_TIMEOUT = 60000;
124
125  // Time, in milliseconds, that the client will wait for a response from the server before timing
126  // out. This value is used server side to determine when it is necessary to send a heartbeat
127  // message to the client. Time limit will be 500 ms.
128  private static int CLIENT_TIMEOUT = 1000;
129
130  // In this test, we sleep after reading each row. So we should make sure after we get some number
131  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
132  private static int DEFAULT_ROW_SLEEP_TIME = 300;
133
134  // Similar with row sleep time.
135  private static int DEFAULT_CF_SLEEP_TIME = 300;
136
137  @BeforeClass
138  public static void setUpBeforeClass() throws Exception {
139    Configuration conf = TEST_UTIL.getConfiguration();
140
141    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
142    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
143    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
144    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
145    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
146
147    // Check the timeout condition after every cell
148    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
149    TEST_UTIL.startMiniCluster(1);
150
151    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
152
153    Configuration newConf = new Configuration(conf);
154    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
155    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
156    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
157  }
158
159  @Test
160  public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
161    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
162    RSRpcServices services = new RSRpcServices(rs);
163    RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
164    // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
165    // finally, 25 is fatal levels of queueing -- exceeding timeout
166    when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
167
168    // assume timeout of 100ms
169    HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
170    when(mockController.getCallTimeout()).thenReturn(100);
171
172    // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
173    EnvironmentEdgeManager.injectEdge(() -> 200L);
174
175    try {
176      // we queued for 20ms, leaving 80ms of timeout, which we divide by 2
177      assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
178      // we queued for 80ms, leaving 20ms of timeout, which we divide by 2
179      assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
180      // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
181      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
182        services.getTimeLimit(mockRpcCall, mockController, true));
183      // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
184      // timed out calls in the queue. in this case we still fallback on default minimum for now.
185      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
186        services.getTimeLimit(mockRpcCall, mockController, true));
187    } finally {
188      EnvironmentEdgeManager.reset();
189    }
190
191  }
192
193  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
194    byte[] cellValue) throws IOException {
195    Table ht = TEST_UTIL.createTable(name, families);
196    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
197    ht.put(puts);
198  }
199
200  /**
201   * Make puts to put the input value into each combination of row, family, and qualifier
202   */
203  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
204    byte[] value) throws IOException {
205    Put put;
206    ArrayList<Put> puts = new ArrayList<>();
207
208    for (int row = 0; row < rows.length; row++) {
209      put = new Put(rows[row]);
210      for (int fam = 0; fam < families.length; fam++) {
211        for (int qual = 0; qual < qualifiers.length; qual++) {
212          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
213          put.add(kv);
214        }
215      }
216      puts.add(put);
217    }
218
219    return puts;
220  }
221
222  @AfterClass
223  public static void tearDownAfterClass() throws Exception {
224    Closeables.close(CONN, true);
225    TEST_UTIL.shutdownMiniCluster();
226  }
227
228  @Before
229  public void setupBeforeTest() throws Exception {
230    disableSleeping();
231  }
232
233  @After
234  public void teardownAfterTest() throws Exception {
235    disableSleeping();
236  }
237
238  /**
239   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
240   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
241   * disabled, the test should throw an exception.
242   */
243  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
244    HeartbeatRPCServices.heartbeatsEnabled = true;
245
246    try {
247      testCallable.call();
248    } catch (Exception e) {
249      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
250        + ExceptionUtils.getStackTrace(e));
251    }
252
253    HeartbeatRPCServices.heartbeatsEnabled = false;
254    try {
255      testCallable.call();
256    } catch (Exception e) {
257      return;
258    } finally {
259      HeartbeatRPCServices.heartbeatsEnabled = true;
260    }
261    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
262      + " is not thrown, the test case is not testing the importance of heartbeat messages");
263  }
264
265  /**
266   * Test the case that the time limit for the scan is reached after each full row of cells is
267   * fetched.
268   */
269  @Test
270  public void testHeartbeatBetweenRows() throws Exception {
271    testImportanceOfHeartbeats(new Callable<Void>() {
272
273      @Override
274      public Void call() throws Exception {
275        // Configure the scan so that it can read the entire table in a single RPC. We want to test
276        // the case where a scan stops on the server side due to a time limit
277        Scan scan = new Scan();
278        scan.setMaxResultSize(Long.MAX_VALUE);
279        scan.setCaching(Integer.MAX_VALUE);
280
281        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
282        return null;
283      }
284    });
285  }
286
287  /**
288   * Test the case that the time limit for scans is reached in between column families
289   */
290  @Test
291  public void testHeartbeatBetweenColumnFamilies() throws Exception {
292    testImportanceOfHeartbeats(new Callable<Void>() {
293      @Override
294      public Void call() throws Exception {
295        // Configure the scan so that it can read the entire table in a single RPC. We want to test
296        // the case where a scan stops on the server side due to a time limit
297        Scan baseScan = new Scan();
298        baseScan.setMaxResultSize(Long.MAX_VALUE);
299        baseScan.setCaching(Integer.MAX_VALUE);
300
301        // Copy the scan before each test. When a scan object is used by a scanner, some of its
302        // fields may be changed such as start row
303        Scan scanCopy = new Scan(baseScan);
304        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
305        scanCopy = new Scan(baseScan);
306        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
307        return null;
308      }
309    });
310  }
311
312  public static class SparseCellFilter extends FilterBase {
313
314    @Override
315    public ReturnCode filterCell(final Cell v) throws IOException {
316      try {
317        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
318      } catch (InterruptedException e) {
319        Thread.currentThread().interrupt();
320      }
321      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1])
322        ? ReturnCode.INCLUDE
323        : ReturnCode.SKIP;
324    }
325
326    public static Filter parseFrom(final byte[] pbBytes) {
327      return new SparseCellFilter();
328    }
329  }
330
331  public static class SparseRowFilter extends FilterBase {
332
333    @Override
334    public boolean filterRowKey(Cell cell) throws IOException {
335      try {
336        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
337      } catch (InterruptedException e) {
338        Thread.currentThread().interrupt();
339      }
340      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
341    }
342
343    public static Filter parseFrom(final byte[] pbBytes) {
344      return new SparseRowFilter();
345    }
346  }
347
348  /**
349   * Test the case that there is a filter which filters most of cells
350   */
351  @Test
352  public void testHeartbeatWithSparseCellFilter() throws Exception {
353    testImportanceOfHeartbeats(new Callable<Void>() {
354      @Override
355      public Void call() throws Exception {
356        Scan scan = new Scan();
357        scan.setMaxResultSize(Long.MAX_VALUE);
358        scan.setCaching(Integer.MAX_VALUE);
359        scan.setFilter(new SparseCellFilter());
360        try (ScanPerNextResultScanner scanner =
361          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
362          int num = 0;
363          while (scanner.next() != null) {
364            num++;
365          }
366          assertEquals(1, num);
367        }
368
369        scan = new Scan();
370        scan.setMaxResultSize(Long.MAX_VALUE);
371        scan.setCaching(Integer.MAX_VALUE);
372        scan.setFilter(new SparseCellFilter());
373        scan.setAllowPartialResults(true);
374        try (ScanPerNextResultScanner scanner =
375          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
376          int num = 0;
377          while (scanner.next() != null) {
378            num++;
379          }
380          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
381        }
382
383        return null;
384      }
385    });
386  }
387
388  /**
389   * Test the case that there is a filter which filters most of rows
390   */
391  @Test
392  public void testHeartbeatWithSparseRowFilter() throws Exception {
393    testImportanceOfHeartbeats(new Callable<Void>() {
394      @Override
395      public Void call() throws Exception {
396        Scan scan = new Scan();
397        scan.setMaxResultSize(Long.MAX_VALUE);
398        scan.setCaching(Integer.MAX_VALUE);
399        scan.setFilter(new SparseRowFilter());
400        try (ScanPerNextResultScanner scanner =
401          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
402          int num = 0;
403          while (scanner.next() != null) {
404            num++;
405          }
406          assertEquals(1, num);
407        }
408
409        return null;
410      }
411    });
412  }
413
414  /**
415   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
416   * necessary
417   * @param scan          The scan configuration being tested
418   * @param rowSleepTime  The time to sleep between fetches of row cells
419   * @param cfSleepTime   The time to sleep between fetches of column family cells
420   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
421   *                      that column family are fetched
422   */
423  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
424    int cfSleepTime, boolean sleepBeforeCf) throws Exception {
425    disableSleeping();
426    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
427    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
428    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
429
430    Result r1 = null;
431    Result r2 = null;
432
433    while ((r1 = scanner.next()) != null) {
434      // Enforce the specified sleep conditions during calls to the heartbeat scanner
435      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
436      r2 = scannerWithHeartbeats.next();
437      disableSleeping();
438
439      assertTrue(r2 != null);
440      try {
441        Result.compareResults(r1, r2);
442      } catch (Exception e) {
443        fail(e.getMessage());
444      }
445    }
446
447    assertTrue(scannerWithHeartbeats.next() == null);
448    scanner.close();
449    scannerWithHeartbeats.close();
450  }
451
452  /**
453   * Helper method for setting the time to sleep between rows and column families. If a sleep time
454   * is negative then that sleep will be disabled
455   */
456  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
457    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
458    HeartbeatHRegion.rowSleepTime = rowSleepTime;
459
460    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
461    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
462    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
463  }
464
465  /**
466   * Disable the sleeping mechanism server side.
467   */
468  private static void disableSleeping() {
469    HeartbeatHRegion.sleepBetweenRows = false;
470    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
471  }
472
473  /**
474   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
475   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
476   */
477  private static class HeartbeatHRegionServer extends HRegionServer {
478    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
479      super(conf);
480    }
481
482    @Override
483    protected RSRpcServices createRpcServices() throws IOException {
484      return new HeartbeatRPCServices(this);
485    }
486  }
487
488  /**
489   * Custom RSRpcServices instance that allows heartbeat support to be toggled
490   */
491  private static class HeartbeatRPCServices extends RSRpcServices {
492    private static volatile boolean heartbeatsEnabled = true;
493
494    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
495      super(rs);
496    }
497
498    @Override
499    public ScanResponse scan(RpcController controller, ScanRequest request)
500      throws ServiceException {
501      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
502      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
503      return super.scan(controller, builder.build());
504    }
505  }
506
507  /**
508   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
509   * between fetches of row Results and/or column family cells. Useful for emulating an instance
510   * where the server is taking a long time to process a client's scan request
511   */
512  private static class HeartbeatHRegion extends HRegion {
513    // Row sleeps occur AFTER each row worth of cells is retrieved.
514    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
515    private static volatile boolean sleepBetweenRows = false;
516
517    // The sleep for column families can be initiated before or after we fetch the cells for the
518    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
519    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
520    // limit will be reached inside RegionScanner after all the cells for a column family have been
521    // retrieved.
522    private static volatile boolean sleepBeforeColumnFamily = false;
523    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
524    private static volatile boolean sleepBetweenColumnFamilies = false;
525
526    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
527      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
528      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
529    }
530
531    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
532      TableDescriptor htd, RegionServerServices rsServices) {
533      super(fs, wal, confParam, htd, rsServices);
534    }
535
536    private static void columnFamilySleep() {
537      if (sleepBetweenColumnFamilies) {
538        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
539      }
540    }
541
542    private static void rowSleep() {
543      if (sleepBetweenRows) {
544        Threads.sleepWithoutInterrupt(rowSleepTime);
545      }
546    }
547
548    // Instantiate the custom heartbeat region scanners
549    @Override
550    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
551      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
552      if (scan.isReversed()) {
553        if (scan.getFilter() != null) {
554          scan.getFilter().setReversed(true);
555        }
556        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
557      }
558      return new HeartbeatRegionScanner(scan, additionalScanners, this);
559    }
560  }
561
562  /**
563   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
564   * and/or column family cells
565   */
566  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
567    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
568      HRegion region) throws IOException {
569      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
570    }
571
572    @Override
573    public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context)
574      throws IOException {
575      boolean moreRows = super.nextRaw(outResults, context);
576      HeartbeatHRegion.rowSleep();
577      return moreRows;
578    }
579
580    @Override
581    protected void initializeKVHeap(List<KeyValueScanner> scanners,
582      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
583      this.storeHeap =
584        new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
585      if (!joinedScanners.isEmpty()) {
586        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
587          (CellComparatorImpl) region.getCellComparator());
588      }
589    }
590  }
591
592  /**
593   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
594   * column family cells
595   */
596  private static class HeartbeatRegionScanner extends RegionScannerImpl {
597    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
598      throws IOException {
599      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
600    }
601
602    @Override
603    public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context)
604      throws IOException {
605      boolean moreRows = super.nextRaw(outResults, context);
606      HeartbeatHRegion.rowSleep();
607      return moreRows;
608    }
609
610    @Override
611    protected void initializeKVHeap(List<KeyValueScanner> scanners,
612      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
613      this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator());
614      if (!joinedScanners.isEmpty()) {
615        this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
616      }
617    }
618  }
619
620  /**
621   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
622   * cells. Useful for testing
623   */
624  private static final class HeartbeatKVHeap extends KeyValueHeap {
625    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
626      throws IOException {
627      super(scanners, comparator);
628    }
629
630    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
631      throws IOException {
632      super(scanners, comparator);
633    }
634
635    @Override
636    public boolean next(List<? super ExtendedCell> result, ScannerContext context)
637      throws IOException {
638      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
639      boolean moreRows = super.next(result, context);
640      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
641      return moreRows;
642    }
643  }
644
645  /**
646   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
647   * cells.
648   */
649  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
650    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
651      CellComparatorImpl comparator) throws IOException {
652      super(scanners, comparator);
653    }
654
655    @Override
656    public boolean next(List<? super ExtendedCell> result, ScannerContext context)
657      throws IOException {
658      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
659      boolean moreRows = super.next(result, context);
660      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
661      return moreRows;
662    }
663  }
664}