001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.hamcrest.CoreMatchers.instanceOf; 024import static org.hamcrest.MatcherAssert.assertThat; 025import static org.junit.Assert.assertArrayEquals; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertNotNull; 029import static org.junit.Assert.assertNull; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.List; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionLocation; 045import org.apache.hadoop.hbase.HTestConst; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 048import org.apache.hadoop.hbase.StartTestingClusterOption; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNameTestRule; 051import org.apache.hadoop.hbase.TableNotFoundException; 052import org.apache.hadoop.hbase.client.Scan.ReadType; 053import org.apache.hadoop.hbase.exceptions.DeserializationException; 054import org.apache.hadoop.hbase.filter.BinaryComparator; 055import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 056import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.filter.QualifierFilter; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.testclassification.ClientTests; 061import org.apache.hadoop.hbase.testclassification.MediumTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.junit.AfterClass; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 075 076/** 077 * A client-side test, mostly testing scanners with various parameters. Parameterized on different 078 * registry implementations. 079 */ 080@Category({ MediumTests.class, ClientTests.class }) 081@RunWith(Parameterized.class) 082public class TestScannersFromClientSide { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 089 090 private static HBaseTestingUtil TEST_UTIL; 091 private static byte[] ROW = Bytes.toBytes("testRow"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 094 private static byte[] VALUE = Bytes.toBytes("testValue"); 095 096 @Rule 097 public TableNameTestRule name = new TableNameTestRule(); 098 099 @AfterClass 100 public static void tearDownAfterClass() throws Exception { 101 if (TEST_UTIL != null) { 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 } 105 106 @Parameterized.Parameters 107 public static Collection<Object[]> parameters() { 108 return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 }, 109 { ZKConnectionRegistry.class, 1 } }); 110 } 111 112 /** 113 * JUnit does not provide an easy way to run a hook after each parameterized run. Without that 114 * there is no easy way to restart the test cluster after each parameterized run. Annotation 115 * BeforeParam does not work either because it runs before parameterization and hence does not 116 * have access to the test parameters (which is weird). This *hack* checks if the current instance 117 * of test cluster configuration has the passed parameterized configs. In such a case, we can just 118 * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it 119 * saves a ton of time for the full test and de-flakes it. 120 */ 121 private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) { 122 // initialize() is called for every unit test, however we only want to reset the cluster state 123 // at the end of every parameterized run. 124 if (TEST_UTIL == null) { 125 return false; 126 } 127 Configuration conf = TEST_UTIL.getConfiguration(); 128 Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 129 ZKConnectionRegistry.class); 130 int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 131 AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); 132 return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; 133 } 134 135 public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception { 136 if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { 137 return; 138 } 139 if (TEST_UTIL != null) { 140 // We reached the end of a parameterized run, clean up the cluster. 141 TEST_UTIL.shutdownMiniCluster(); 142 } 143 TEST_UTIL = new HBaseTestingUtil(); 144 Configuration conf = TEST_UTIL.getConfiguration(); 145 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 146 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, 147 ConnectionRegistry.class); 148 Preconditions.checkArgument(numHedgedReqs > 0); 149 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); 150 StartTestingClusterOption.Builder builder = StartTestingClusterOption.builder(); 151 // Multiple masters needed only when hedged reads for master registry are enabled. 152 builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); 153 TEST_UTIL.startMiniCluster(builder.build()); 154 } 155 156 /** 157 * Test from client side for batch of scan 158 */ 159 @Test 160 public void testScanBatch() throws Exception { 161 final TableName tableName = name.getTableName(); 162 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 163 164 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 165 166 Put put; 167 Scan scan; 168 Delete delete; 169 Result result; 170 ResultScanner scanner; 171 boolean toLog = true; 172 List<Cell> kvListExp; 173 174 // table: row, family, c0:0, c1:1, ... , c7:7 175 put = new Put(ROW); 176 for (int i = 0; i < QUALIFIERS.length; i++) { 177 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 178 put.add(kv); 179 } 180 ht.put(put); 181 182 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 183 put = new Put(ROW); 184 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 185 put.add(kv); 186 ht.put(put); 187 188 // delete upto ts: 3 189 delete = new Delete(ROW); 190 delete.addFamily(FAMILY, 3); 191 ht.delete(delete); 192 193 // without batch 194 scan = new Scan().withStartRow(ROW); 195 scan.readAllVersions(); 196 scanner = ht.getScanner(scan); 197 198 // c4:4, c5:5, c6:6, c7:7 199 kvListExp = new ArrayList<>(); 200 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 201 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 202 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 203 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 204 result = scanner.next(); 205 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 206 207 // with batch 208 scan = new Scan().withStartRow(ROW); 209 scan.readAllVersions(); 210 scan.setBatch(2); 211 scanner = ht.getScanner(scan); 212 213 // First batch: c4:4, c5:5 214 kvListExp = new ArrayList<>(); 215 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 216 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 217 result = scanner.next(); 218 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 219 220 // Second batch: c6:6, c7:7 221 kvListExp = new ArrayList<>(); 222 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 223 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 224 result = scanner.next(); 225 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 226 227 } 228 229 @Test 230 public void testMaxResultSizeIsSetToDefault() throws Exception { 231 final TableName tableName = name.getTableName(); 232 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 233 234 // The max result size we expect the scan to use by default. 235 long expectedMaxResultSize = 236 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 237 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 238 239 int numRows = 5; 240 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 241 242 int numQualifiers = 10; 243 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 244 245 // Specify the cell size such that a single row will be larger than the default 246 // value of maxResultSize. This means that Scan RPCs should return at most a single 247 // result back to the client. 248 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 249 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 250 251 Put put; 252 List<Put> puts = new ArrayList<>(); 253 for (int row = 0; row < ROWS.length; row++) { 254 put = new Put(ROWS[row]); 255 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 256 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 257 put.add(kv); 258 } 259 puts.add(put); 260 } 261 ht.put(puts); 262 263 // Create a scan with the default configuration. 264 Scan scan = new Scan(); 265 266 try (ResultScanner scanner = ht.getScanner(scan)) { 267 assertThat(scanner, instanceOf(AsyncTableResultScanner.class)); 268 scanner.next(); 269 AsyncTableResultScanner s = (AsyncTableResultScanner) scanner; 270 // The scanner should have, at most, a single result in its cache. If there more results 271 // exists 272 // in the cache it means that more than the expected max result size was fetched. 273 assertTrue("The cache contains: " + s.getCacheSize() + " results", s.getCacheSize() <= 1); 274 } 275 } 276 277 /** 278 * Scan on not existing table should throw the exception with correct message 279 */ 280 @Test 281 public void testScannerForNotExistingTable() { 282 String[] tableNames = { "A", "Z", "A:A", "Z:Z" }; 283 for (String tableName : tableNames) { 284 try { 285 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 286 testSmallScan(table, true, 1, 5); 287 fail("TableNotFoundException was not thrown"); 288 } catch (TableNotFoundException e) { 289 // We expect that the message for TableNotFoundException would have only the table name only 290 // Otherwise that would mean that localeRegionInMeta doesn't work properly 291 assertEquals(e.getMessage(), tableName); 292 } catch (Exception e) { 293 fail("Unexpected exception " + e.getMessage()); 294 } 295 } 296 } 297 298 @Test 299 public void testSmallScan() throws Exception { 300 final TableName tableName = name.getTableName(); 301 302 int numRows = 10; 303 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 304 305 int numQualifiers = 10; 306 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 307 308 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 309 310 Put put; 311 List<Put> puts = new ArrayList<>(); 312 for (int row = 0; row < ROWS.length; row++) { 313 put = new Put(ROWS[row]); 314 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 315 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 316 put.add(kv); 317 } 318 puts.add(put); 319 } 320 ht.put(puts); 321 322 int expectedRows = numRows; 323 int expectedCols = numRows * numQualifiers; 324 325 // Test normal and reversed 326 testSmallScan(ht, true, expectedRows, expectedCols); 327 testSmallScan(ht, false, expectedRows, expectedCols); 328 } 329 330 /** 331 * Run through a variety of test configurations with a small scan 332 */ 333 private void testSmallScan(Table table, boolean reversed, int rows, int columns) 334 throws Exception { 335 Scan baseScan = new Scan(); 336 baseScan.setReversed(reversed); 337 baseScan.setReadType(ReadType.PREAD); 338 339 Scan scan = new Scan(baseScan); 340 verifyExpectedCounts(table, scan, rows, columns); 341 342 scan = new Scan(baseScan); 343 scan.setMaxResultSize(1); 344 verifyExpectedCounts(table, scan, rows, columns); 345 346 scan = new Scan(baseScan); 347 scan.setMaxResultSize(1); 348 scan.setCaching(Integer.MAX_VALUE); 349 verifyExpectedCounts(table, scan, rows, columns); 350 } 351 352 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 353 int expectedCellCount) throws Exception { 354 ResultScanner scanner = table.getScanner(scan); 355 356 int rowCount = 0; 357 int cellCount = 0; 358 Result r = null; 359 while ((r = scanner.next()) != null) { 360 rowCount++; 361 cellCount += r.rawCells().length; 362 } 363 364 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 365 expectedRowCount == rowCount); 366 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 367 expectedCellCount == cellCount); 368 scanner.close(); 369 } 370 371 /** 372 * Test from client side for get with maxResultPerCF set 373 */ 374 @Test 375 public void testGetMaxResults() throws Exception { 376 final TableName tableName = name.getTableName(); 377 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 378 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 379 380 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 381 382 Get get; 383 Put put; 384 Result result; 385 boolean toLog = true; 386 List<Cell> kvListExp; 387 388 kvListExp = new ArrayList<>(); 389 // Insert one CF for row[0] 390 put = new Put(ROW); 391 for (int i = 0; i < 10; i++) { 392 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 393 put.add(kv); 394 kvListExp.add(kv); 395 } 396 ht.put(put); 397 398 get = new Get(ROW); 399 result = ht.get(get); 400 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 401 402 get = new Get(ROW); 403 get.setMaxResultsPerColumnFamily(2); 404 result = ht.get(get); 405 kvListExp = new ArrayList<>(); 406 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 407 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 408 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 409 410 // Filters: ColumnRangeFilter 411 get = new Get(ROW); 412 get.setMaxResultsPerColumnFamily(5); 413 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 414 result = ht.get(get); 415 kvListExp = new ArrayList<>(); 416 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 417 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 418 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 419 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 420 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 421 422 // Insert two more CF for row[0] 423 // 20 columns for CF2, 10 columns for CF1 424 put = new Put(ROW); 425 for (int i = 0; i < QUALIFIERS.length; i++) { 426 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 427 put.add(kv); 428 } 429 ht.put(put); 430 431 put = new Put(ROW); 432 for (int i = 0; i < 10; i++) { 433 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 434 put.add(kv); 435 } 436 ht.put(put); 437 438 get = new Get(ROW); 439 get.setMaxResultsPerColumnFamily(12); 440 get.addFamily(FAMILIES[1]); 441 get.addFamily(FAMILIES[2]); 442 result = ht.get(get); 443 kvListExp = new ArrayList<>(); 444 // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 445 for (int i = 0; i < 10; i++) { 446 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 447 } 448 for (int i = 0; i < 2; i++) { 449 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 450 } 451 for (int i = 10; i < 20; i++) { 452 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 453 } 454 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 455 456 // Filters: ColumnRangeFilter and ColumnPrefixFilter 457 get = new Get(ROW); 458 get.setMaxResultsPerColumnFamily(3); 459 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 460 result = ht.get(get); 461 kvListExp = new ArrayList<>(); 462 for (int i = 2; i < 5; i++) { 463 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 464 } 465 for (int i = 2; i < 5; i++) { 466 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 467 } 468 for (int i = 2; i < 5; i++) { 469 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 470 } 471 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 472 473 get = new Get(ROW); 474 get.setMaxResultsPerColumnFamily(7); 475 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 476 result = ht.get(get); 477 kvListExp = new ArrayList<>(); 478 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 479 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 480 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 481 for (int i = 10; i < 16; i++) { 482 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 483 } 484 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 485 486 } 487 488 /** 489 * Test from client side for scan with maxResultPerCF set 490 */ 491 @Test 492 public void testScanMaxResults() throws Exception { 493 final TableName tableName = name.getTableName(); 494 byte[][] ROWS = HTestConst.makeNAscii(ROW, 2); 495 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 496 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 497 498 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 499 500 Put put; 501 Scan scan; 502 Result result; 503 boolean toLog = true; 504 List<Cell> kvListExp, kvListScan; 505 506 kvListExp = new ArrayList<>(); 507 508 for (int r = 0; r < ROWS.length; r++) { 509 put = new Put(ROWS[r]); 510 for (int c = 0; c < FAMILIES.length; c++) { 511 for (int q = 0; q < QUALIFIERS.length; q++) { 512 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 513 put.add(kv); 514 if (q < 4) { 515 kvListExp.add(kv); 516 } 517 } 518 } 519 ht.put(put); 520 } 521 522 scan = new Scan(); 523 scan.setMaxResultsPerColumnFamily(4); 524 ResultScanner scanner = ht.getScanner(scan); 525 kvListScan = new ArrayList<>(); 526 while ((result = scanner.next()) != null) { 527 for (Cell kv : result.listCells()) { 528 kvListScan.add(kv); 529 } 530 } 531 result = Result.create(kvListScan); 532 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 533 534 } 535 536 /** 537 * Test from client side for get with rowOffset 538 */ 539 @Test 540 public void testGetRowOffset() throws Exception { 541 final TableName tableName = name.getTableName(); 542 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 543 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 544 545 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 546 547 Get get; 548 Put put; 549 Result result; 550 boolean toLog = true; 551 List<Cell> kvListExp; 552 553 // Insert one CF for row 554 kvListExp = new ArrayList<>(); 555 put = new Put(ROW); 556 for (int i = 0; i < 10; i++) { 557 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 558 put.add(kv); 559 // skipping first two kvs 560 if (i < 2) { 561 continue; 562 } 563 kvListExp.add(kv); 564 } 565 ht.put(put); 566 567 // setting offset to 2 568 get = new Get(ROW); 569 get.setRowOffsetPerColumnFamily(2); 570 result = ht.get(get); 571 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 572 573 // setting offset to 20 574 get = new Get(ROW); 575 get.setRowOffsetPerColumnFamily(20); 576 result = ht.get(get); 577 kvListExp = new ArrayList<>(); 578 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 579 580 // offset + maxResultPerCF 581 get = new Get(ROW); 582 get.setRowOffsetPerColumnFamily(4); 583 get.setMaxResultsPerColumnFamily(5); 584 result = ht.get(get); 585 kvListExp = new ArrayList<>(); 586 for (int i = 4; i < 9; i++) { 587 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 588 } 589 verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF"); 590 591 // Filters: ColumnRangeFilter 592 get = new Get(ROW); 593 get.setRowOffsetPerColumnFamily(1); 594 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 595 result = ht.get(get); 596 kvListExp = new ArrayList<>(); 597 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 598 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 599 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 600 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 601 602 // Insert into two more CFs for row 603 // 10 columns for CF2, 10 columns for CF1 604 for (int j = 2; j > 0; j--) { 605 put = new Put(ROW); 606 for (int i = 0; i < 10; i++) { 607 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 608 put.add(kv); 609 } 610 ht.put(put); 611 } 612 613 get = new Get(ROW); 614 get.setRowOffsetPerColumnFamily(4); 615 get.setMaxResultsPerColumnFamily(2); 616 get.addFamily(FAMILIES[1]); 617 get.addFamily(FAMILIES[2]); 618 result = ht.get(get); 619 kvListExp = new ArrayList<>(); 620 // Exp: CF1:q4, q5, CF2: q4, q5 621 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 622 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 623 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 624 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 625 verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults"); 626 } 627 628 @Test 629 public void testScanRawDeleteFamilyVersion() throws Exception { 630 TableName tableName = name.getTableName(); 631 TEST_UTIL.createTable(tableName, FAMILY); 632 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 633 conf.set(RPC_CODEC_CONF_KEY, ""); 634 conf.set(DEFAULT_CODEC_CLASS, ""); 635 try (Connection connection = ConnectionFactory.createConnection(conf); 636 Table table = connection.getTable(tableName)) { 637 Delete delete = new Delete(ROW); 638 delete.addFamilyVersion(FAMILY, 0L); 639 table.delete(delete); 640 Scan scan = new Scan().withStartRow(ROW).setRaw(true); 641 ResultScanner scanner = table.getScanner(scan); 642 int count = 0; 643 while (scanner.next() != null) { 644 count++; 645 } 646 assertEquals(1, count); 647 } finally { 648 TEST_UTIL.deleteTable(tableName); 649 } 650 } 651 652 /** 653 * Test from client side for scan while the region is reopened on the same region server. 654 */ 655 @Test 656 public void testScanOnReopenedRegion() throws Exception { 657 final TableName tableName = name.getTableName(); 658 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 659 660 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 661 662 Put put; 663 Scan scan; 664 Result result; 665 ResultScanner scanner; 666 boolean toLog = false; 667 List<Cell> kvListExp; 668 669 // table: row, family, c0:0, c1:1 670 put = new Put(ROW); 671 for (int i = 0; i < QUALIFIERS.length; i++) { 672 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 673 put.add(kv); 674 } 675 ht.put(put); 676 677 scan = new Scan().withStartRow(ROW); 678 scanner = ht.getScanner(scan); 679 680 HRegionLocation loc; 681 682 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 683 loc = locator.getRegionLocation(ROW); 684 } 685 RegionInfo hri = loc.getRegion(); 686 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 687 byte[] regionName = hri.getRegionName(); 688 int i = cluster.getServerWith(regionName); 689 HRegionServer rs = cluster.getRegionServer(i); 690 LOG.info("Unassigning " + hri); 691 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 692 long startTime = EnvironmentEdgeManager.currentTime(); 693 long timeOut = 10000; 694 boolean offline = false; 695 while (true) { 696 if (rs.getOnlineRegion(regionName) == null) { 697 offline = true; 698 break; 699 } 700 assertTrue("Timed out in closing the testing region", 701 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 702 } 703 assertTrue(offline); 704 LOG.info("Assigning " + hri); 705 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 706 startTime = EnvironmentEdgeManager.currentTime(); 707 while (true) { 708 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 709 if (rs != null && rs.getOnlineRegion(regionName) != null) { 710 offline = false; 711 break; 712 } 713 assertTrue("Timed out in open the testing region", 714 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 715 } 716 assertFalse(offline); 717 718 // c0:0, c1:1 719 kvListExp = new ArrayList<>(); 720 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 721 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 722 result = scanner.next(); 723 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 724 } 725 726 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) { 727 728 LOG.info(msg); 729 LOG.info("Expected count: " + expKvList.size()); 730 LOG.info("Actual count: " + result.size()); 731 if (expKvList.isEmpty()) { 732 return; 733 } 734 735 int i = 0; 736 for (Cell kv : result.rawCells()) { 737 if (i >= expKvList.size()) { 738 break; // we will check the size later 739 } 740 741 Cell kvExp = expKvList.get(i++); 742 if (toLog) { 743 LOG.info("get kv is: " + kv.toString()); 744 LOG.info("exp kv is: " + kvExp.toString()); 745 } 746 assertTrue("Not equal", kvExp.equals(kv)); 747 } 748 749 assertEquals(expKvList.size(), result.size()); 750 } 751 752 @Test 753 public void testReadExpiredDataForRawScan() throws IOException { 754 TableName tableName = name.getTableName(); 755 long ts = EnvironmentEdgeManager.currentTime() - 10000; 756 byte[] value = Bytes.toBytes("expired"); 757 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 758 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 759 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 760 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 761 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setTimeToLive(5).build()); 762 try (ResultScanner scanner = table.getScanner(FAMILY)) { 763 assertNull(scanner.next()); 764 } 765 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 766 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 767 assertNull(scanner.next()); 768 } 769 } 770 } 771 772 @Test 773 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 774 TableName tableName = name.getTableName(); 775 long now = EnvironmentEdgeManager.currentTime(); 776 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 777 for (int i = 0; i < 4; i++) { 778 Put put = new Put(ROW); 779 put.addColumn(FAMILY, QUALIFIER, now + i, VALUE); 780 table.put(put); 781 } 782 783 Scan scan = new Scan(); 784 scan.addColumn(FAMILY, QUALIFIER); 785 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 786 scan.readVersions(3); 787 788 try (ResultScanner scanner = table.getScanner(scan)) { 789 Result result = scanner.next(); 790 assertEquals(3, result.size()); 791 } 792 } 793 } 794 795 @Test 796 public void testScanWithSameStartRowStopRow() throws IOException { 797 TableName tableName = name.getTableName(); 798 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 799 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 800 801 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 802 try (ResultScanner scanner = table.getScanner(scan)) { 803 assertNull(scanner.next()); 804 } 805 806 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 807 try (ResultScanner scanner = table.getScanner(scan)) { 808 Result result = scanner.next(); 809 assertNotNull(result); 810 assertArrayEquals(ROW, result.getRow()); 811 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 812 assertNull(scanner.next()); 813 } 814 815 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 816 try (ResultScanner scanner = table.getScanner(scan)) { 817 assertNull(scanner.next()); 818 } 819 820 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 821 try (ResultScanner scanner = table.getScanner(scan)) { 822 assertNull(scanner.next()); 823 } 824 825 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 826 try (ResultScanner scanner = table.getScanner(scan)) { 827 assertNull(scanner.next()); 828 } 829 } 830 } 831 832 @Test 833 public void testReverseScanWithFlush() throws Exception { 834 TableName tableName = name.getTableName(); 835 final int BATCH_SIZE = 10; 836 final int ROWS_TO_INSERT = 100; 837 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 838 839 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 840 Admin admin = TEST_UTIL.getAdmin()) { 841 List<Put> putList = new ArrayList<>(); 842 for (long i = 0; i < ROWS_TO_INSERT; i++) { 843 Put put = new Put(Bytes.toBytes(i)); 844 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 845 putList.add(put); 846 847 if (putList.size() >= BATCH_SIZE) { 848 table.put(putList); 849 admin.flush(tableName); 850 putList.clear(); 851 } 852 } 853 854 if (!putList.isEmpty()) { 855 table.put(putList); 856 admin.flush(tableName); 857 putList.clear(); 858 } 859 860 Scan scan = new Scan(); 861 scan.setReversed(true); 862 int count = 0; 863 864 try (ResultScanner results = table.getScanner(scan)) { 865 for (Result result : results) { 866 count++; 867 } 868 } 869 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 870 ROWS_TO_INSERT, count); 871 } 872 } 873 874 @Test 875 public void testScannerWithPartialResults() throws Exception { 876 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 877 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) { 878 List<Put> puts = new ArrayList<>(); 879 byte[] largeArray = new byte[10000]; 880 Put put = new Put(Bytes.toBytes("aaaa0")); 881 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 882 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 883 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 884 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 885 puts.add(put); 886 put = new Put(Bytes.toBytes("aaaa1")); 887 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 888 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 889 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 890 puts.add(put); 891 table.put(puts); 892 Scan scan = new Scan(); 893 scan.addFamily(Bytes.toBytes("c")); 894 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 895 scan.setMaxResultSize(10001); 896 scan.withStopRow(Bytes.toBytes("bbbb")); 897 scan.setFilter(new LimitKVsReturnFilter()); 898 ResultScanner rs = table.getScanner(scan); 899 Result result; 900 int expectedKvNumber = 6; 901 int returnedKvNumber = 0; 902 while ((result = rs.next()) != null) { 903 returnedKvNumber += result.listCells().size(); 904 } 905 rs.close(); 906 assertEquals(expectedKvNumber, returnedKvNumber); 907 } 908 } 909 910 public static class LimitKVsReturnFilter extends FilterBase { 911 912 private int cellCount = 0; 913 914 @Override 915 public ReturnCode filterCell(Cell v) throws IOException { 916 if (cellCount >= 6) { 917 cellCount++; 918 return ReturnCode.SKIP; 919 } 920 cellCount++; 921 return ReturnCode.INCLUDE; 922 } 923 924 @Override 925 public boolean filterAllRemaining() throws IOException { 926 if (cellCount < 7) { 927 return false; 928 } 929 cellCount++; 930 return true; 931 } 932 933 @Override 934 public String toString() { 935 return this.getClass().getSimpleName(); 936 } 937 938 public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes) 939 throws DeserializationException { 940 return new LimitKVsReturnFilter(); 941 } 942 } 943}