001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.List; 036import java.util.concurrent.TimeUnit; 037import java.util.function.Consumer; 038import java.util.stream.IntStream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CompareOperator; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionInfo; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTestConst; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.MiniHBaseCluster; 051import org.apache.hadoop.hbase.StartMiniClusterOption; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.TableNameTestRule; 054import org.apache.hadoop.hbase.TableNotFoundException; 055import org.apache.hadoop.hbase.exceptions.DeserializationException; 056import org.apache.hadoop.hbase.filter.BinaryComparator; 057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 058import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 059import org.apache.hadoop.hbase.filter.FilterBase; 060import org.apache.hadoop.hbase.filter.QualifierFilter; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.runner.RunWith; 073import org.junit.runners.Parameterized; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 078 079/** 080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different 081 * registry implementations. 082 */ 083@Category({ MediumTests.class, ClientTests.class }) 084@RunWith(Parameterized.class) 085public class TestScannersFromClientSide { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 092 093 private static HBaseTestingUtility TEST_UTIL; 094 private static byte[] ROW = Bytes.toBytes("testRow"); 095 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 096 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 097 private static byte[] VALUE = Bytes.toBytes("testValue"); 098 099 @Rule 100 public TableNameTestRule name = new TableNameTestRule(); 101 102 /** 103 * @throws java.lang.Exception 104 */ 105 @AfterClass 106 public static void tearDownAfterClass() throws Exception { 107 if (TEST_UTIL != null) { 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 } 111 112 /** 113 * @throws java.lang.Exception 114 */ 115 @Before 116 public void setUp() throws Exception { 117 // Nothing to do. 118 } 119 120 @Parameterized.Parameters 121 public static Collection<Object[]> parameters() { 122 return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 }, 123 { ZKConnectionRegistry.class, 1 } }); 124 } 125 126 /** 127 * JUnit does not provide an easy way to run a hook after each parameterized run. Without that 128 * there is no easy way to restart the test cluster after each parameterized run. Annotation 129 * BeforeParam does not work either because it runs before parameterization and hence does not 130 * have access to the test parameters (which is weird). This *hack* checks if the current instance 131 * of test cluster configuration has the passed parameterized configs. In such a case, we can just 132 * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it 133 * saves a ton of time for the full test and de-flakes it. 134 */ 135 private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) { 136 // initialize() is called for every unit test, however we only want to reset the cluster state 137 // at the end of every parameterized run. 138 if (TEST_UTIL == null) { 139 return false; 140 } 141 Configuration conf = TEST_UTIL.getConfiguration(); 142 Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 143 ZKConnectionRegistry.class); 144 int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 145 AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); 146 return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; 147 } 148 149 public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception { 150 if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { 151 return; 152 } 153 if (TEST_UTIL != null) { 154 // We reached the end of a parameterized run, clean up the cluster. 155 TEST_UTIL.shutdownMiniCluster(); 156 } 157 TEST_UTIL = new HBaseTestingUtility(); 158 Configuration conf = TEST_UTIL.getConfiguration(); 159 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 160 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, 161 ConnectionRegistry.class); 162 Preconditions.checkArgument(numHedgedReqs > 0); 163 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); 164 StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); 165 // Multiple masters needed only when hedged reads for master registry are enabled. 166 builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); 167 TEST_UTIL.startMiniCluster(builder.build()); 168 } 169 170 @Test 171 public void testScanImmutable() throws IOException { 172 TableName tableName = name.getTableName(); 173 Table table = TEST_UTIL.createTable(tableName, FAMILY); 174 TEST_UTIL.loadRandomRows(table, FAMILY, 100, 100); 175 176 Scan scan = new Scan().setCaching(-1).setMvccReadPoint(-1).setScanMetricsEnabled(true); 177 178 try (ResultScanner scanner = table.getScanner(scan)) { 179 scanner.next(1000); 180 } 181 // these 2 should be unchanged 182 assertEquals(-1, scan.getCaching()); 183 assertEquals(-1, scan.getMvccReadPoint()); 184 // scan metrics should be populated 185 assertNotNull(scan.getScanMetrics()); 186 assertEquals(scan.getScanMetrics().countOfRegions.get(), 1); 187 } 188 189 /** 190 * Test from client side for batch of scan 191 */ 192 @Test 193 public void testScanBatch() throws Exception { 194 final TableName tableName = name.getTableName(); 195 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 196 197 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 198 199 Put put; 200 Scan scan; 201 Delete delete; 202 Result result; 203 ResultScanner scanner; 204 boolean toLog = true; 205 List<Cell> kvListExp; 206 207 // table: row, family, c0:0, c1:1, ... , c7:7 208 put = new Put(ROW); 209 for (int i = 0; i < QUALIFIERS.length; i++) { 210 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 211 put.add(kv); 212 } 213 ht.put(put); 214 215 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 216 put = new Put(ROW); 217 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 218 put.add(kv); 219 ht.put(put); 220 221 // delete upto ts: 3 222 delete = new Delete(ROW); 223 delete.addFamily(FAMILY, 3); 224 ht.delete(delete); 225 226 // without batch 227 scan = new Scan().withStartRow(ROW); 228 scan.setMaxVersions(); 229 scanner = ht.getScanner(scan); 230 231 // c4:4, c5:5, c6:6, c7:7 232 kvListExp = new ArrayList<>(); 233 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 234 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 235 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 236 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 237 result = scanner.next(); 238 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 239 240 // with batch 241 scan = new Scan().withStartRow(ROW); 242 scan.setMaxVersions(); 243 scan.setBatch(2); 244 scanner = ht.getScanner(scan); 245 246 // First batch: c4:4, c5:5 247 kvListExp = new ArrayList<>(); 248 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 249 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 250 result = scanner.next(); 251 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 252 253 // Second batch: c6:6, c7:7 254 kvListExp = new ArrayList<>(); 255 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 256 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 257 result = scanner.next(); 258 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 259 260 } 261 262 @Test 263 public void testMaxResultSizeIsSetToDefault() throws Exception { 264 final TableName tableName = name.getTableName(); 265 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 266 267 // The max result size we expect the scan to use by default. 268 long expectedMaxResultSize = 269 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 270 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 271 272 int numRows = 5; 273 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 274 275 int numQualifiers = 10; 276 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 277 278 // Specify the cell size such that a single row will be larger than the default 279 // value of maxResultSize. This means that Scan RPCs should return at most a single 280 // result back to the client. 281 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 282 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 283 284 Put put; 285 List<Put> puts = new ArrayList<>(); 286 for (int row = 0; row < ROWS.length; row++) { 287 put = new Put(ROWS[row]); 288 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 289 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 290 put.add(kv); 291 } 292 puts.add(put); 293 } 294 ht.put(puts); 295 296 // Create a scan with the default configuration. 297 Scan scan = new Scan(); 298 299 ResultScanner scanner = ht.getScanner(scan); 300 assertTrue(scanner instanceof ClientScanner); 301 ClientScanner clientScanner = (ClientScanner) scanner; 302 303 // Call next to issue a single RPC to the server 304 scanner.next(); 305 306 // The scanner should have, at most, a single result in its cache. If there more results exists 307 // in the cache it means that more than the expected max result size was fetched. 308 assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", 309 clientScanner.getCacheSize() <= 1); 310 } 311 312 /** 313 * Scan on not existing table should throw the exception with correct message 314 */ 315 @Test 316 public void testScannerForNotExistingTable() { 317 String[] tableNames = { "A", "Z", "A:A", "Z:Z" }; 318 for (String tableName : tableNames) { 319 try { 320 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 321 testSmallScan(table, true, 1, 5); 322 fail("TableNotFoundException was not thrown"); 323 } catch (TableNotFoundException e) { 324 // We expect that the message for TableNotFoundException would have only the table name only 325 // Otherwise that would mean that localeRegionInMeta doesn't work properly 326 assertEquals(e.getMessage(), tableName); 327 } catch (Exception e) { 328 fail("Unexpected exception " + e.getMessage()); 329 } 330 } 331 } 332 333 @Test 334 public void testSmallScan() throws Exception { 335 final TableName tableName = name.getTableName(); 336 337 int numRows = 10; 338 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 339 340 int numQualifiers = 10; 341 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 342 343 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 344 345 Put put; 346 List<Put> puts = new ArrayList<>(); 347 for (int row = 0; row < ROWS.length; row++) { 348 put = new Put(ROWS[row]); 349 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 350 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 351 put.add(kv); 352 } 353 puts.add(put); 354 } 355 ht.put(puts); 356 357 int expectedRows = numRows; 358 int expectedCols = numRows * numQualifiers; 359 360 // Test normal and reversed 361 testSmallScan(ht, true, expectedRows, expectedCols); 362 testSmallScan(ht, false, expectedRows, expectedCols); 363 } 364 365 /** 366 * Run through a variety of test configurations with a small scan 367 */ 368 private void testSmallScan(Table table, boolean reversed, int rows, int columns) 369 throws Exception { 370 Scan baseScan = new Scan(); 371 baseScan.setReversed(reversed); 372 baseScan.setSmall(true); 373 374 Scan scan = new Scan(baseScan); 375 verifyExpectedCounts(table, scan, rows, columns); 376 377 scan = new Scan(baseScan); 378 scan.setMaxResultSize(1); 379 verifyExpectedCounts(table, scan, rows, columns); 380 381 scan = new Scan(baseScan); 382 scan.setMaxResultSize(1); 383 scan.setCaching(Integer.MAX_VALUE); 384 verifyExpectedCounts(table, scan, rows, columns); 385 } 386 387 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 388 int expectedCellCount) throws Exception { 389 ResultScanner scanner = table.getScanner(scan); 390 391 int rowCount = 0; 392 int cellCount = 0; 393 Result r = null; 394 while ((r = scanner.next()) != null) { 395 rowCount++; 396 cellCount += r.rawCells().length; 397 } 398 399 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 400 expectedRowCount == rowCount); 401 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 402 expectedCellCount == cellCount); 403 scanner.close(); 404 } 405 406 /** 407 * Test from client side for get with maxResultPerCF set 408 */ 409 @Test 410 public void testGetMaxResults() throws Exception { 411 final TableName tableName = name.getTableName(); 412 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 413 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 414 415 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 416 417 Get get; 418 Put put; 419 Result result; 420 boolean toLog = true; 421 List<Cell> kvListExp; 422 423 kvListExp = new ArrayList<>(); 424 // Insert one CF for row[0] 425 put = new Put(ROW); 426 for (int i = 0; i < 10; i++) { 427 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 428 put.add(kv); 429 kvListExp.add(kv); 430 } 431 ht.put(put); 432 433 get = new Get(ROW); 434 result = ht.get(get); 435 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 436 437 get = new Get(ROW); 438 get.setMaxResultsPerColumnFamily(2); 439 result = ht.get(get); 440 kvListExp = new ArrayList<>(); 441 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 442 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 443 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 444 445 // Filters: ColumnRangeFilter 446 get = new Get(ROW); 447 get.setMaxResultsPerColumnFamily(5); 448 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 449 result = ht.get(get); 450 kvListExp = new ArrayList<>(); 451 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 452 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 453 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 454 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 455 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 456 457 // Insert two more CF for row[0] 458 // 20 columns for CF2, 10 columns for CF1 459 put = new Put(ROW); 460 for (int i = 0; i < QUALIFIERS.length; i++) { 461 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 462 put.add(kv); 463 } 464 ht.put(put); 465 466 put = new Put(ROW); 467 for (int i = 0; i < 10; i++) { 468 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 469 put.add(kv); 470 } 471 ht.put(put); 472 473 get = new Get(ROW); 474 get.setMaxResultsPerColumnFamily(12); 475 get.addFamily(FAMILIES[1]); 476 get.addFamily(FAMILIES[2]); 477 result = ht.get(get); 478 kvListExp = new ArrayList<>(); 479 // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 480 for (int i = 0; i < 10; i++) { 481 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 482 } 483 for (int i = 0; i < 2; i++) { 484 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 485 } 486 for (int i = 10; i < 20; i++) { 487 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 488 } 489 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 490 491 // Filters: ColumnRangeFilter and ColumnPrefixFilter 492 get = new Get(ROW); 493 get.setMaxResultsPerColumnFamily(3); 494 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 495 result = ht.get(get); 496 kvListExp = new ArrayList<>(); 497 for (int i = 2; i < 5; i++) { 498 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 499 } 500 for (int i = 2; i < 5; i++) { 501 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 502 } 503 for (int i = 2; i < 5; i++) { 504 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 505 } 506 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 507 508 get = new Get(ROW); 509 get.setMaxResultsPerColumnFamily(7); 510 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 511 result = ht.get(get); 512 kvListExp = new ArrayList<>(); 513 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 514 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 515 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 516 for (int i = 10; i < 16; i++) { 517 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 518 } 519 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 520 521 } 522 523 /** 524 * Test from client side for scan with maxResultPerCF set 525 */ 526 @Test 527 public void testScanMaxResults() throws Exception { 528 final TableName tableName = name.getTableName(); 529 byte[][] ROWS = HTestConst.makeNAscii(ROW, 2); 530 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 531 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 532 533 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 534 535 Put put; 536 Scan scan; 537 Result result; 538 boolean toLog = true; 539 List<Cell> kvListExp, kvListScan; 540 541 kvListExp = new ArrayList<>(); 542 543 for (int r = 0; r < ROWS.length; r++) { 544 put = new Put(ROWS[r]); 545 for (int c = 0; c < FAMILIES.length; c++) { 546 for (int q = 0; q < QUALIFIERS.length; q++) { 547 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 548 put.add(kv); 549 if (q < 4) { 550 kvListExp.add(kv); 551 } 552 } 553 } 554 ht.put(put); 555 } 556 557 scan = new Scan(); 558 scan.setMaxResultsPerColumnFamily(4); 559 ResultScanner scanner = ht.getScanner(scan); 560 kvListScan = new ArrayList<>(); 561 while ((result = scanner.next()) != null) { 562 for (Cell kv : result.listCells()) { 563 kvListScan.add(kv); 564 } 565 } 566 result = Result.create(kvListScan); 567 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 568 569 } 570 571 /** 572 * Test from client side for get with rowOffset 573 */ 574 @Test 575 public void testGetRowOffset() throws Exception { 576 final TableName tableName = name.getTableName(); 577 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 578 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 579 580 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 581 582 Get get; 583 Put put; 584 Result result; 585 boolean toLog = true; 586 List<Cell> kvListExp; 587 588 // Insert one CF for row 589 kvListExp = new ArrayList<>(); 590 put = new Put(ROW); 591 for (int i = 0; i < 10; i++) { 592 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 593 put.add(kv); 594 // skipping first two kvs 595 if (i < 2) { 596 continue; 597 } 598 kvListExp.add(kv); 599 } 600 ht.put(put); 601 602 // setting offset to 2 603 get = new Get(ROW); 604 get.setRowOffsetPerColumnFamily(2); 605 result = ht.get(get); 606 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 607 608 // setting offset to 20 609 get = new Get(ROW); 610 get.setRowOffsetPerColumnFamily(20); 611 result = ht.get(get); 612 kvListExp = new ArrayList<>(); 613 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 614 615 // offset + maxResultPerCF 616 get = new Get(ROW); 617 get.setRowOffsetPerColumnFamily(4); 618 get.setMaxResultsPerColumnFamily(5); 619 result = ht.get(get); 620 kvListExp = new ArrayList<>(); 621 for (int i = 4; i < 9; i++) { 622 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 623 } 624 verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF"); 625 626 // Filters: ColumnRangeFilter 627 get = new Get(ROW); 628 get.setRowOffsetPerColumnFamily(1); 629 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 630 result = ht.get(get); 631 kvListExp = new ArrayList<>(); 632 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 633 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 634 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 635 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 636 637 // Insert into two more CFs for row 638 // 10 columns for CF2, 10 columns for CF1 639 for (int j = 2; j > 0; j--) { 640 put = new Put(ROW); 641 for (int i = 0; i < 10; i++) { 642 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 643 put.add(kv); 644 } 645 ht.put(put); 646 } 647 648 get = new Get(ROW); 649 get.setRowOffsetPerColumnFamily(4); 650 get.setMaxResultsPerColumnFamily(2); 651 get.addFamily(FAMILIES[1]); 652 get.addFamily(FAMILIES[2]); 653 result = ht.get(get); 654 kvListExp = new ArrayList<>(); 655 // Exp: CF1:q4, q5, CF2: q4, q5 656 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 657 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 658 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 659 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 660 verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults"); 661 } 662 663 @Test 664 public void testScanRawDeleteFamilyVersion() throws Exception { 665 TableName tableName = name.getTableName(); 666 TEST_UTIL.createTable(tableName, FAMILY); 667 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 668 conf.set(RPC_CODEC_CONF_KEY, ""); 669 conf.set(DEFAULT_CODEC_CLASS, ""); 670 try (Connection connection = ConnectionFactory.createConnection(conf); 671 Table table = connection.getTable(tableName)) { 672 Delete delete = new Delete(ROW); 673 delete.addFamilyVersion(FAMILY, 0L); 674 table.delete(delete); 675 Scan scan = new Scan(ROW).setRaw(true); 676 ResultScanner scanner = table.getScanner(scan); 677 int count = 0; 678 while (scanner.next() != null) { 679 count++; 680 } 681 assertEquals(1, count); 682 } finally { 683 TEST_UTIL.deleteTable(tableName); 684 } 685 } 686 687 /** 688 * Test from client side for scan while the region is reopened on the same region server. 689 */ 690 @Test 691 public void testScanOnReopenedRegion() throws Exception { 692 final TableName tableName = name.getTableName(); 693 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 694 695 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 696 697 Put put; 698 Scan scan; 699 Result result; 700 ResultScanner scanner; 701 boolean toLog = false; 702 List<Cell> kvListExp; 703 704 // table: row, family, c0:0, c1:1 705 put = new Put(ROW); 706 for (int i = 0; i < QUALIFIERS.length; i++) { 707 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 708 put.add(kv); 709 } 710 ht.put(put); 711 712 scan = new Scan().withStartRow(ROW); 713 scanner = ht.getScanner(scan); 714 715 HRegionLocation loc; 716 717 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 718 loc = locator.getRegionLocation(ROW); 719 } 720 HRegionInfo hri = loc.getRegionInfo(); 721 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 722 byte[] regionName = hri.getRegionName(); 723 int i = cluster.getServerWith(regionName); 724 HRegionServer rs = cluster.getRegionServer(i); 725 LOG.info("Unassigning " + hri); 726 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 727 long startTime = EnvironmentEdgeManager.currentTime(); 728 long timeOut = 10000; 729 boolean offline = false; 730 while (true) { 731 if (rs.getOnlineRegion(regionName) == null) { 732 offline = true; 733 break; 734 } 735 assertTrue("Timed out in closing the testing region", 736 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 737 } 738 assertTrue(offline); 739 LOG.info("Assigning " + hri); 740 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 741 startTime = EnvironmentEdgeManager.currentTime(); 742 while (true) { 743 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 744 if (rs != null && rs.getOnlineRegion(regionName) != null) { 745 offline = false; 746 break; 747 } 748 assertTrue("Timed out in open the testing region", 749 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 750 } 751 assertFalse(offline); 752 753 // c0:0, c1:1 754 kvListExp = new ArrayList<>(); 755 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 756 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 757 result = scanner.next(); 758 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 759 } 760 761 @Test 762 public void testAsyncScannerWithSmallData() throws Exception { 763 testAsyncScanner(name.getTableName(), 2, 3, 10, -1, null); 764 } 765 766 @Test 767 public void testAsyncScannerWithManyRows() throws Exception { 768 testAsyncScanner(name.getTableName(), 30000, 1, 1, -1, null); 769 } 770 771 @Test 772 public void testAsyncScannerWithoutCaching() throws Exception { 773 testAsyncScanner(name.getTableName(), 5, 1, 1, 1, (b) -> { 774 try { 775 TimeUnit.MILLISECONDS.sleep(500); 776 } catch (InterruptedException ex) { 777 } 778 }); 779 } 780 781 private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, 782 int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { 783 assert rowNumber > 0; 784 assert familyNumber > 0; 785 assert qualifierNumber > 0; 786 byte[] row = Bytes.toBytes("r"); 787 byte[] family = Bytes.toBytes("f"); 788 byte[] qualifier = Bytes.toBytes("q"); 789 byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); 790 byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); 791 byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); 792 793 Table ht = TEST_UTIL.createTable(table, families); 794 795 boolean toLog = true; 796 List<Cell> kvListExp = new ArrayList<>(); 797 798 List<Put> puts = new ArrayList<>(); 799 for (byte[] r : rows) { 800 Put put = new Put(r); 801 for (byte[] f : families) { 802 for (byte[] q : qualifiers) { 803 KeyValue kv = new KeyValue(r, f, q, 1, VALUE); 804 put.add(kv); 805 kvListExp.add(kv); 806 } 807 } 808 puts.add(put); 809 if (puts.size() > 1000) { 810 ht.put(puts); 811 puts.clear(); 812 } 813 } 814 if (!puts.isEmpty()) { 815 ht.put(puts); 816 puts.clear(); 817 } 818 819 Scan scan = new Scan(); 820 scan.setAsyncPrefetch(true); 821 if (caching > 0) { 822 scan.setCaching(caching); 823 } 824 try (ResultScanner scanner = ht.getScanner(scan)) { 825 assertTrue("Not instance of async scanner", scanner instanceof ClientAsyncPrefetchScanner); 826 ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); 827 List<Cell> kvListScan = new ArrayList<>(); 828 Result result; 829 boolean first = true; 830 int actualRows = 0; 831 while ((result = scanner.next()) != null) { 832 ++actualRows; 833 // waiting for cache. see HBASE-17376 834 if (first) { 835 TimeUnit.SECONDS.sleep(1); 836 first = false; 837 } 838 for (Cell kv : result.listCells()) { 839 kvListScan.add(kv); 840 } 841 } 842 assertEquals(rowNumber, actualRows); 843 // These cells may have different rows but it is ok. The Result#getRow 844 // isn't used in the verifyResult() 845 result = Result.create(kvListScan); 846 verifyResult(result, kvListExp, toLog, "Testing async scan"); 847 } 848 849 TEST_UTIL.deleteTable(table); 850 } 851 852 private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { 853 int maxLength = Integer.toString(n).length(); 854 byte[][] ret = new byte[n][]; 855 for (int i = 0; i < n; i++) { 856 int length = Integer.toString(i).length(); 857 StringBuilder buf = new StringBuilder(Integer.toString(i)); 858 IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); 859 byte[] tail = Bytes.toBytes(buf.toString()); 860 ret[i] = Bytes.add(base, tail); 861 } 862 return ret; 863 } 864 865 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) { 866 867 LOG.info(msg); 868 LOG.info("Expected count: " + expKvList.size()); 869 LOG.info("Actual count: " + result.size()); 870 if (expKvList.isEmpty()) { 871 return; 872 } 873 874 int i = 0; 875 for (Cell kv : result.rawCells()) { 876 if (i >= expKvList.size()) { 877 break; // we will check the size later 878 } 879 880 Cell kvExp = expKvList.get(i++); 881 if (toLog) { 882 LOG.info("get kv is: " + kv.toString()); 883 LOG.info("exp kv is: " + kvExp.toString()); 884 } 885 assertTrue("Not equal", kvExp.equals(kv)); 886 } 887 888 assertEquals(expKvList.size(), result.size()); 889 } 890 891 @Test 892 public void testReadExpiredDataForRawScan() throws IOException { 893 TableName tableName = name.getTableName(); 894 long ts = EnvironmentEdgeManager.currentTime() - 10000; 895 byte[] value = Bytes.toBytes("expired"); 896 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 897 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 898 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 899 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 900 new HColumnDescriptor(FAMILY).setTimeToLive(5)); 901 try (ResultScanner scanner = table.getScanner(FAMILY)) { 902 assertNull(scanner.next()); 903 } 904 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 905 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 906 assertNull(scanner.next()); 907 } 908 } 909 } 910 911 @Test 912 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 913 TableName tableName = name.getTableName(); 914 long now = EnvironmentEdgeManager.currentTime(); 915 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 916 for (int i = 0; i < 4; i++) { 917 Put put = new Put(ROW); 918 put.addColumn(FAMILY, QUALIFIER, now + i, VALUE); 919 table.put(put); 920 } 921 922 Scan scan = new Scan(); 923 scan.addColumn(FAMILY, QUALIFIER); 924 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 925 scan.readVersions(3); 926 927 try (ResultScanner scanner = table.getScanner(scan)) { 928 Result result = scanner.next(); 929 assertEquals(3, result.size()); 930 } 931 } 932 } 933 934 @Test 935 public void testScanWithSameStartRowStopRow() throws IOException { 936 TableName tableName = name.getTableName(); 937 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 938 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 939 940 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 941 try (ResultScanner scanner = table.getScanner(scan)) { 942 assertNull(scanner.next()); 943 } 944 945 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 946 try (ResultScanner scanner = table.getScanner(scan)) { 947 Result result = scanner.next(); 948 assertNotNull(result); 949 assertArrayEquals(ROW, result.getRow()); 950 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 951 assertNull(scanner.next()); 952 } 953 954 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 955 try (ResultScanner scanner = table.getScanner(scan)) { 956 assertNull(scanner.next()); 957 } 958 959 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 960 try (ResultScanner scanner = table.getScanner(scan)) { 961 assertNull(scanner.next()); 962 } 963 964 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 965 try (ResultScanner scanner = table.getScanner(scan)) { 966 assertNull(scanner.next()); 967 } 968 } 969 } 970 971 @Test 972 public void testReverseScanWithFlush() throws Exception { 973 TableName tableName = name.getTableName(); 974 final int BATCH_SIZE = 10; 975 final int ROWS_TO_INSERT = 100; 976 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 977 978 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 979 Admin admin = TEST_UTIL.getAdmin()) { 980 List<Put> putList = new ArrayList<>(); 981 for (long i = 0; i < ROWS_TO_INSERT; i++) { 982 Put put = new Put(Bytes.toBytes(i)); 983 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 984 putList.add(put); 985 986 if (putList.size() >= BATCH_SIZE) { 987 table.put(putList); 988 admin.flush(tableName); 989 putList.clear(); 990 } 991 } 992 993 if (!putList.isEmpty()) { 994 table.put(putList); 995 admin.flush(tableName); 996 putList.clear(); 997 } 998 999 Scan scan = new Scan(); 1000 scan.setReversed(true); 1001 int count = 0; 1002 1003 try (ResultScanner results = table.getScanner(scan)) { 1004 for (Result result : results) { 1005 count++; 1006 } 1007 } 1008 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 1009 ROWS_TO_INSERT, count); 1010 } 1011 } 1012 1013 @Test 1014 public void testScannerWithPartialResults() throws Exception { 1015 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 1016 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) { 1017 List<Put> puts = new ArrayList<>(); 1018 byte[] largeArray = new byte[10000]; 1019 Put put = new Put(Bytes.toBytes("aaaa0")); 1020 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1021 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 1022 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 1023 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 1024 puts.add(put); 1025 put = new Put(Bytes.toBytes("aaaa1")); 1026 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1027 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 1028 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 1029 puts.add(put); 1030 table.put(puts); 1031 Scan scan = new Scan(); 1032 scan.addFamily(Bytes.toBytes("c")); 1033 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 1034 scan.setMaxResultSize(10001); 1035 scan.setStopRow(Bytes.toBytes("bbbb")); 1036 scan.setFilter(new LimitKVsReturnFilter()); 1037 ResultScanner rs = table.getScanner(scan); 1038 Result result; 1039 int expectedKvNumber = 6; 1040 int returnedKvNumber = 0; 1041 while ((result = rs.next()) != null) { 1042 returnedKvNumber += result.listCells().size(); 1043 } 1044 rs.close(); 1045 assertEquals(expectedKvNumber, returnedKvNumber); 1046 } 1047 } 1048 1049 public static class LimitKVsReturnFilter extends FilterBase { 1050 1051 private int cellCount = 0; 1052 1053 @Override 1054 public ReturnCode filterCell(Cell v) throws IOException { 1055 if (cellCount >= 6) { 1056 cellCount++; 1057 return ReturnCode.SKIP; 1058 } 1059 cellCount++; 1060 return ReturnCode.INCLUDE; 1061 } 1062 1063 @Override 1064 public boolean filterAllRemaining() throws IOException { 1065 if (cellCount < 7) { 1066 return false; 1067 } 1068 cellCount++; 1069 return true; 1070 } 1071 1072 @Override 1073 public String toString() { 1074 return this.getClass().getSimpleName(); 1075 } 1076 1077 public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes) 1078 throws DeserializationException { 1079 return new LimitKVsReturnFilter(); 1080 } 1081 } 1082}