001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.Callable; 030import org.apache.commons.lang3.exception.ExceptionUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HTestConst; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 046import org.apache.hadoop.hbase.client.AsyncConnection; 047import org.apache.hadoop.hbase.client.AsyncTable; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.ScanPerNextResultScanner; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.filter.Filter; 058import org.apache.hadoop.hbase.filter.FilterBase; 059import org.apache.hadoop.hbase.ipc.HBaseRpcController; 060import org.apache.hadoop.hbase.ipc.RpcCall; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.util.Threads; 065import org.apache.hadoop.hbase.wal.WAL; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Before; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.mockito.Mockito; 074 075import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 076import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 077import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 081 082/** 083 * Here we test to make sure that scans return the expected Results when the server is sending the 084 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 085 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 086 * the client when the server has exceeded the time limit during the processing of the scan. When 087 * the time limit is reached, the server will return to the Client whatever Results it has 088 * accumulated (potentially empty). 089 */ 090@Category(LargeTests.class) 091public class TestScannerHeartbeatMessages { 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class); 096 097 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 099 private static AsyncConnection CONN; 100 101 /** 102 * Table configuration 103 */ 104 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 105 106 private static int NUM_ROWS = 5; 107 private static byte[] ROW = Bytes.toBytes("testRow"); 108 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 109 110 private static int NUM_FAMILIES = 4; 111 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 112 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 113 114 private static int NUM_QUALIFIERS = 3; 115 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 116 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 117 118 private static int VALUE_SIZE = 128; 119 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 120 121 // The time limit should be based on the rpc timeout at client, or the client will regards 122 // the request as timeout before server return a heartbeat. 123 private static int SERVER_TIMEOUT = 60000; 124 125 // Time, in milliseconds, that the client will wait for a response from the server before timing 126 // out. This value is used server side to determine when it is necessary to send a heartbeat 127 // message to the client. Time limit will be 500 ms. 128 private static int CLIENT_TIMEOUT = 1000; 129 130 // In this test, we sleep after reading each row. So we should make sure after we get some number 131 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 132 private static int DEFAULT_ROW_SLEEP_TIME = 300; 133 134 // Similar with row sleep time. 135 private static int DEFAULT_CF_SLEEP_TIME = 300; 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 Configuration conf = TEST_UTIL.getConfiguration(); 140 141 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 142 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 143 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 144 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 145 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 146 147 // Check the timeout condition after every cell 148 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 149 TEST_UTIL.startMiniCluster(1); 150 151 createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 152 153 Configuration newConf = new Configuration(conf); 154 newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 155 newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); 156 CONN = ConnectionFactory.createAsyncConnection(newConf).get(); 157 } 158 159 @Test 160 public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException { 161 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 162 RSRpcServices services = new RSRpcServices(rs); 163 RpcCall mockRpcCall = Mockito.mock(RpcCall.class); 164 // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing) 165 // finally, 25 is fatal levels of queueing -- exceeding timeout 166 when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L); 167 168 // assume timeout of 100ms 169 HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class); 170 when(mockController.getCallTimeout()).thenReturn(100); 171 172 // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas 173 EnvironmentEdgeManager.injectEdge(() -> 200L); 174 175 try { 176 // we queued for 20ms, leaving 80ms of timeout, which we divide by 2 177 assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 178 // we queued for 80ms, leaving 20ms of timeout, which we divide by 2 179 assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 180 // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum 181 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 182 services.getTimeLimit(mockRpcCall, mockController, true)); 183 // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop 184 // timed out calls in the queue. in this case we still fallback on default minimum for now. 185 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 186 services.getTimeLimit(mockRpcCall, mockController, true)); 187 } finally { 188 EnvironmentEdgeManager.reset(); 189 } 190 191 } 192 193 static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, 194 byte[] cellValue) throws IOException { 195 Table ht = TEST_UTIL.createTable(name, families); 196 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 197 ht.put(puts); 198 } 199 200 /** 201 * Make puts to put the input value into each combination of row, family, and qualifier 202 */ 203 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 204 byte[] value) throws IOException { 205 Put put; 206 ArrayList<Put> puts = new ArrayList<>(); 207 208 for (int row = 0; row < rows.length; row++) { 209 put = new Put(rows[row]); 210 for (int fam = 0; fam < families.length; fam++) { 211 for (int qual = 0; qual < qualifiers.length; qual++) { 212 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 213 put.add(kv); 214 } 215 } 216 puts.add(put); 217 } 218 219 return puts; 220 } 221 222 @AfterClass 223 public static void tearDownAfterClass() throws Exception { 224 Closeables.close(CONN, true); 225 TEST_UTIL.shutdownMiniCluster(); 226 } 227 228 @Before 229 public void setupBeforeTest() throws Exception { 230 disableSleeping(); 231 } 232 233 @After 234 public void teardownAfterTest() throws Exception { 235 disableSleeping(); 236 } 237 238 /** 239 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 240 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 241 * disabled, the test should throw an exception. 242 */ 243 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 244 HeartbeatRPCServices.heartbeatsEnabled = true; 245 246 try { 247 testCallable.call(); 248 } catch (Exception e) { 249 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 250 + ExceptionUtils.getStackTrace(e)); 251 } 252 253 HeartbeatRPCServices.heartbeatsEnabled = false; 254 try { 255 testCallable.call(); 256 } catch (Exception e) { 257 return; 258 } finally { 259 HeartbeatRPCServices.heartbeatsEnabled = true; 260 } 261 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 262 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 263 } 264 265 /** 266 * Test the case that the time limit for the scan is reached after each full row of cells is 267 * fetched. 268 */ 269 @Test 270 public void testHeartbeatBetweenRows() throws Exception { 271 testImportanceOfHeartbeats(new Callable<Void>() { 272 273 @Override 274 public Void call() throws Exception { 275 // Configure the scan so that it can read the entire table in a single RPC. We want to test 276 // the case where a scan stops on the server side due to a time limit 277 Scan scan = new Scan(); 278 scan.setMaxResultSize(Long.MAX_VALUE); 279 scan.setCaching(Integer.MAX_VALUE); 280 281 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 282 return null; 283 } 284 }); 285 } 286 287 /** 288 * Test the case that the time limit for scans is reached in between column families 289 */ 290 @Test 291 public void testHeartbeatBetweenColumnFamilies() throws Exception { 292 testImportanceOfHeartbeats(new Callable<Void>() { 293 @Override 294 public Void call() throws Exception { 295 // Configure the scan so that it can read the entire table in a single RPC. We want to test 296 // the case where a scan stops on the server side due to a time limit 297 Scan baseScan = new Scan(); 298 baseScan.setMaxResultSize(Long.MAX_VALUE); 299 baseScan.setCaching(Integer.MAX_VALUE); 300 301 // Copy the scan before each test. When a scan object is used by a scanner, some of its 302 // fields may be changed such as start row 303 Scan scanCopy = new Scan(baseScan); 304 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 305 scanCopy = new Scan(baseScan); 306 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 307 return null; 308 } 309 }); 310 } 311 312 public static class SparseCellFilter extends FilterBase { 313 314 @Override 315 public ReturnCode filterCell(final Cell v) throws IOException { 316 try { 317 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 318 } catch (InterruptedException e) { 319 Thread.currentThread().interrupt(); 320 } 321 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) 322 ? ReturnCode.INCLUDE 323 : ReturnCode.SKIP; 324 } 325 326 public static Filter parseFrom(final byte[] pbBytes) { 327 return new SparseCellFilter(); 328 } 329 } 330 331 public static class SparseRowFilter extends FilterBase { 332 333 @Override 334 public boolean filterRowKey(Cell cell) throws IOException { 335 try { 336 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 337 } catch (InterruptedException e) { 338 Thread.currentThread().interrupt(); 339 } 340 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 341 } 342 343 public static Filter parseFrom(final byte[] pbBytes) { 344 return new SparseRowFilter(); 345 } 346 } 347 348 /** 349 * Test the case that there is a filter which filters most of cells 350 */ 351 @Test 352 public void testHeartbeatWithSparseCellFilter() throws Exception { 353 testImportanceOfHeartbeats(new Callable<Void>() { 354 @Override 355 public Void call() throws Exception { 356 Scan scan = new Scan(); 357 scan.setMaxResultSize(Long.MAX_VALUE); 358 scan.setCaching(Integer.MAX_VALUE); 359 scan.setFilter(new SparseCellFilter()); 360 try (ScanPerNextResultScanner scanner = 361 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 362 int num = 0; 363 while (scanner.next() != null) { 364 num++; 365 } 366 assertEquals(1, num); 367 } 368 369 scan = new Scan(); 370 scan.setMaxResultSize(Long.MAX_VALUE); 371 scan.setCaching(Integer.MAX_VALUE); 372 scan.setFilter(new SparseCellFilter()); 373 scan.setAllowPartialResults(true); 374 try (ScanPerNextResultScanner scanner = 375 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 376 int num = 0; 377 while (scanner.next() != null) { 378 num++; 379 } 380 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 381 } 382 383 return null; 384 } 385 }); 386 } 387 388 /** 389 * Test the case that there is a filter which filters most of rows 390 */ 391 @Test 392 public void testHeartbeatWithSparseRowFilter() throws Exception { 393 testImportanceOfHeartbeats(new Callable<Void>() { 394 @Override 395 public Void call() throws Exception { 396 Scan scan = new Scan(); 397 scan.setMaxResultSize(Long.MAX_VALUE); 398 scan.setCaching(Integer.MAX_VALUE); 399 scan.setFilter(new SparseRowFilter()); 400 try (ScanPerNextResultScanner scanner = 401 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 402 int num = 0; 403 while (scanner.next() != null) { 404 num++; 405 } 406 assertEquals(1, num); 407 } 408 409 return null; 410 } 411 }); 412 } 413 414 /** 415 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 416 * necessary 417 * @param scan The scan configuration being tested 418 * @param rowSleepTime The time to sleep between fetches of row cells 419 * @param cfSleepTime The time to sleep between fetches of column family cells 420 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 421 * that column family are fetched 422 */ 423 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 424 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 425 disableSleeping(); 426 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 427 final ResultScanner scanner = new ScanPerNextResultScanner(table, scan); 428 final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan); 429 430 Result r1 = null; 431 Result r2 = null; 432 433 while ((r1 = scanner.next()) != null) { 434 // Enforce the specified sleep conditions during calls to the heartbeat scanner 435 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 436 r2 = scannerWithHeartbeats.next(); 437 disableSleeping(); 438 439 assertTrue(r2 != null); 440 try { 441 Result.compareResults(r1, r2); 442 } catch (Exception e) { 443 fail(e.getMessage()); 444 } 445 } 446 447 assertTrue(scannerWithHeartbeats.next() == null); 448 scanner.close(); 449 scannerWithHeartbeats.close(); 450 } 451 452 /** 453 * Helper method for setting the time to sleep between rows and column families. If a sleep time 454 * is negative then that sleep will be disabled 455 */ 456 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 457 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 458 HeartbeatHRegion.rowSleepTime = rowSleepTime; 459 460 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 461 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 462 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 463 } 464 465 /** 466 * Disable the sleeping mechanism server side. 467 */ 468 private static void disableSleeping() { 469 HeartbeatHRegion.sleepBetweenRows = false; 470 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 471 } 472 473 /** 474 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 475 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 476 */ 477 private static class HeartbeatHRegionServer extends HRegionServer { 478 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 479 super(conf); 480 } 481 482 @Override 483 protected RSRpcServices createRpcServices() throws IOException { 484 return new HeartbeatRPCServices(this); 485 } 486 } 487 488 /** 489 * Custom RSRpcServices instance that allows heartbeat support to be toggled 490 */ 491 private static class HeartbeatRPCServices extends RSRpcServices { 492 private static volatile boolean heartbeatsEnabled = true; 493 494 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 495 super(rs); 496 } 497 498 @Override 499 public ScanResponse scan(RpcController controller, ScanRequest request) 500 throws ServiceException { 501 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 502 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 503 return super.scan(controller, builder.build()); 504 } 505 } 506 507 /** 508 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 509 * between fetches of row Results and/or column family cells. Useful for emulating an instance 510 * where the server is taking a long time to process a client's scan request 511 */ 512 private static class HeartbeatHRegion extends HRegion { 513 // Row sleeps occur AFTER each row worth of cells is retrieved. 514 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 515 private static volatile boolean sleepBetweenRows = false; 516 517 // The sleep for column families can be initiated before or after we fetch the cells for the 518 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 519 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 520 // limit will be reached inside RegionScanner after all the cells for a column family have been 521 // retrieved. 522 private static volatile boolean sleepBeforeColumnFamily = false; 523 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 524 private static volatile boolean sleepBetweenColumnFamilies = false; 525 526 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 527 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 528 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 529 } 530 531 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 532 TableDescriptor htd, RegionServerServices rsServices) { 533 super(fs, wal, confParam, htd, rsServices); 534 } 535 536 private static void columnFamilySleep() { 537 if (sleepBetweenColumnFamilies) { 538 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 539 } 540 } 541 542 private static void rowSleep() { 543 if (sleepBetweenRows) { 544 Threads.sleepWithoutInterrupt(rowSleepTime); 545 } 546 } 547 548 // Instantiate the custom heartbeat region scanners 549 @Override 550 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 551 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 552 if (scan.isReversed()) { 553 if (scan.getFilter() != null) { 554 scan.getFilter().setReversed(true); 555 } 556 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 557 } 558 return new HeartbeatRegionScanner(scan, additionalScanners, this); 559 } 560 } 561 562 /** 563 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 564 * and/or column family cells 565 */ 566 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 567 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 568 HRegion region) throws IOException { 569 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 570 } 571 572 @Override 573 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context) 574 throws IOException { 575 boolean moreRows = super.nextRaw(outResults, context); 576 HeartbeatHRegion.rowSleep(); 577 return moreRows; 578 } 579 580 @Override 581 protected void initializeKVHeap(List<KeyValueScanner> scanners, 582 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 583 this.storeHeap = 584 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 585 if (!joinedScanners.isEmpty()) { 586 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 587 (CellComparatorImpl) region.getCellComparator()); 588 } 589 } 590 } 591 592 /** 593 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 594 * column family cells 595 */ 596 private static class HeartbeatRegionScanner extends RegionScannerImpl { 597 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 598 throws IOException { 599 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 600 } 601 602 @Override 603 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context) 604 throws IOException { 605 boolean moreRows = super.nextRaw(outResults, context); 606 HeartbeatHRegion.rowSleep(); 607 return moreRows; 608 } 609 610 @Override 611 protected void initializeKVHeap(List<KeyValueScanner> scanners, 612 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 613 this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator()); 614 if (!joinedScanners.isEmpty()) { 615 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 616 } 617 } 618 } 619 620 /** 621 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 622 * cells. Useful for testing 623 */ 624 private static final class HeartbeatKVHeap extends KeyValueHeap { 625 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 626 throws IOException { 627 super(scanners, comparator); 628 } 629 630 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 631 throws IOException { 632 super(scanners, comparator); 633 } 634 635 @Override 636 public boolean next(List<? super ExtendedCell> result, ScannerContext context) 637 throws IOException { 638 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 639 boolean moreRows = super.next(result, context); 640 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 641 return moreRows; 642 } 643 } 644 645 /** 646 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 647 * cells. 648 */ 649 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 650 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 651 CellComparatorImpl comparator) throws IOException { 652 super(scanners, comparator); 653 } 654 655 @Override 656 public boolean next(List<? super ExtendedCell> result, ScannerContext context) 657 throws IOException { 658 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 659 boolean moreRows = super.next(result, context); 660 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 661 return moreRows; 662 } 663 } 664}