001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HTestConst; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.UnknownScannerException; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.filter.Filter; 053import org.apache.hadoop.hbase.filter.InclusiveStopFilter; 054import org.apache.hadoop.hbase.filter.PrefixFilter; 055import org.apache.hadoop.hbase.filter.WhileMatchFilter; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * Test of a long-lived scanner validating as we go. 070 */ 071@Category({ RegionServerTests.class, MediumTests.class }) 072public class TestScanner { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestScanner.class); 077 078 @Rule 079 public TestName name = new TestName(); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class); 082 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 083 084 private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW; 085 private static final byte[][] COLS = { HConstants.CATALOG_FAMILY }; 086 private static final byte[][] EXPLICIT_COLS = 087 { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER, 088 // TODO ryan 089 // HConstants.STARTCODE_QUALIFIER 090 }; 091 092 static final TableDescriptor TESTTABLEDESC = 093 TableDescriptorBuilder.newBuilder(TableName.valueOf("testscanner")) 094 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY) 095 // Ten is an arbitrary number. Keep versions to help debugging. 096 .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024).build()) 097 .build(); 098 099 /** HRegionInfo for root region */ 100 public static final RegionInfo REGION_INFO = 101 RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build(); 102 103 private static final byte[] ROW_KEY = REGION_INFO.getRegionName(); 104 105 private static final long START_CODE = Long.MAX_VALUE; 106 107 private HRegion region; 108 109 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; 110 final private byte[] col1; 111 112 public TestScanner() { 113 super(); 114 115 firstRowBytes = START_KEY_BYTES; 116 secondRowBytes = START_KEY_BYTES.clone(); 117 // Increment the least significant character so we get to next row. 118 secondRowBytes[START_KEY_BYTES.length - 1]++; 119 thirdRowBytes = START_KEY_BYTES.clone(); 120 thirdRowBytes[START_KEY_BYTES.length - 1] = 121 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 122 col1 = Bytes.toBytes("column1"); 123 } 124 125 /** 126 * Test basic stop row filter works. 127 */ 128 @Test 129 public void testStopRow() throws Exception { 130 byte[] startrow = Bytes.toBytes("bbb"); 131 byte[] stoprow = Bytes.toBytes("ccc"); 132 try { 133 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 134 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 135 List<Cell> results = new ArrayList<>(); 136 // Do simple test of getting one row only first. 137 Scan scan = new Scan().withStartRow(Bytes.toBytes("abc")).withStopRow(Bytes.toBytes("abd")); 138 scan.addFamily(HConstants.CATALOG_FAMILY); 139 140 InternalScanner s = region.getScanner(scan); 141 int count = 0; 142 while (s.next(results)) { 143 count++; 144 } 145 s.close(); 146 assertEquals(0, count); 147 // Now do something a bit more imvolved. 148 scan = new Scan().withStartRow(startrow).withStopRow(stoprow); 149 scan.addFamily(HConstants.CATALOG_FAMILY); 150 151 s = region.getScanner(scan); 152 count = 0; 153 Cell kv = null; 154 results = new ArrayList<>(); 155 for (boolean first = true; s.next(results);) { 156 kv = results.get(0); 157 if (first) { 158 assertTrue(CellUtil.matchingRows(kv, startrow)); 159 first = false; 160 } 161 count++; 162 } 163 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0); 164 // We got something back. 165 assertTrue(count > 10); 166 s.close(); 167 } finally { 168 HBaseTestingUtil.closeRegionAndWAL(this.region); 169 } 170 } 171 172 void rowPrefixFilter(Scan scan) throws IOException { 173 List<Cell> results = new ArrayList<>(); 174 scan.addFamily(HConstants.CATALOG_FAMILY); 175 InternalScanner s = region.getScanner(scan); 176 boolean hasMore = true; 177 while (hasMore) { 178 hasMore = s.next(results); 179 for (Cell kv : results) { 180 assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]); 181 assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]); 182 } 183 results.clear(); 184 } 185 s.close(); 186 } 187 188 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException { 189 List<Cell> results = new ArrayList<>(); 190 scan.addFamily(HConstants.CATALOG_FAMILY); 191 InternalScanner s = region.getScanner(scan); 192 boolean hasMore = true; 193 while (hasMore) { 194 hasMore = s.next(results); 195 for (Cell kv : results) { 196 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0); 197 } 198 results.clear(); 199 } 200 s.close(); 201 } 202 203 @Test 204 public void testFilters() throws IOException { 205 try { 206 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 207 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 208 byte[] prefix = Bytes.toBytes("ab"); 209 Filter newFilter = new PrefixFilter(prefix); 210 Scan scan = new Scan(); 211 scan.setFilter(newFilter); 212 rowPrefixFilter(scan); 213 214 byte[] stopRow = Bytes.toBytes("bbc"); 215 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow)); 216 scan = new Scan(); 217 scan.setFilter(newFilter); 218 rowInclusiveStopFilter(scan, stopRow); 219 220 } finally { 221 HBaseTestingUtil.closeRegionAndWAL(this.region); 222 } 223 } 224 225 /** 226 * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a 227 * UnknownScannerException. HBASE-2503 228 */ 229 @Test 230 public void testRaceBetweenClientAndTimeout() throws Exception { 231 try { 232 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 233 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 234 Scan scan = new Scan(); 235 InternalScanner s = region.getScanner(scan); 236 List<Cell> results = new ArrayList<>(); 237 try { 238 s.next(results); 239 s.close(); 240 s.next(results); 241 fail("We don't want anything more, we should be failing"); 242 } catch (UnknownScannerException ex) { 243 // ok! 244 return; 245 } 246 } finally { 247 HBaseTestingUtil.closeRegionAndWAL(this.region); 248 } 249 } 250 251 /** 252 * The test! 253 */ 254 @Test 255 public void testScanner() throws IOException { 256 try { 257 region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 258 Table table = new RegionAsTable(region); 259 260 // Write information to the meta table 261 262 Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 263 264 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 265 RegionInfo.toByteArray(REGION_INFO)); 266 table.put(put); 267 268 // What we just committed is in the memstore. Verify that we can get 269 // it back both with scanning and get 270 271 scan(false, null); 272 getRegionInfo(table); 273 274 // Close and re-open 275 276 ((HRegion) region).close(); 277 region = HRegion.openHRegion(region, null); 278 table = new RegionAsTable(region); 279 280 // Verify we can get the data back now that it is on disk. 281 282 scan(false, null); 283 getRegionInfo(table); 284 285 // Store some new information 286 287 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtil.randomFreePort(); 288 289 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 290 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 291 292 // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE)); 293 294 table.put(put); 295 296 // Validate that we can still get the HRegionInfo, even though it is in 297 // an older row on disk and there is a newer row in the memstore 298 299 scan(true, address.toString()); 300 getRegionInfo(table); 301 302 // flush cache 303 this.region.flush(true); 304 305 // Validate again 306 307 scan(true, address.toString()); 308 getRegionInfo(table); 309 310 // Close and reopen 311 312 ((HRegion) region).close(); 313 region = HRegion.openHRegion(region, null); 314 table = new RegionAsTable(region); 315 316 // Validate again 317 318 scan(true, address.toString()); 319 getRegionInfo(table); 320 321 // Now update the information again 322 323 address = "bar.foo.com:4321"; 324 325 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 326 327 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 328 table.put(put); 329 330 // Validate again 331 332 scan(true, address.toString()); 333 getRegionInfo(table); 334 335 // flush cache 336 337 region.flush(true); 338 339 // Validate again 340 341 scan(true, address.toString()); 342 getRegionInfo(table); 343 344 // Close and reopen 345 346 ((HRegion) this.region).close(); 347 this.region = HRegion.openHRegion(region, null); 348 table = new RegionAsTable(this.region); 349 350 // Validate again 351 352 scan(true, address.toString()); 353 getRegionInfo(table); 354 355 } finally { 356 // clean up 357 HBaseTestingUtil.closeRegionAndWAL(this.region); 358 } 359 } 360 361 /** Compare the HRegionInfo we read from HBase to what we stored */ 362 private void validateRegionInfo(byte[] regionBytes) throws IOException { 363 RegionInfo info = RegionInfo.parseFromOrNull(regionBytes); 364 365 assertEquals(REGION_INFO.getRegionId(), info.getRegionId()); 366 assertEquals(0, info.getStartKey().length); 367 assertEquals(0, info.getEndKey().length); 368 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName())); 369 // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc())); 370 } 371 372 /** Use a scanner to get the region info and then validate the results */ 373 private void scan(boolean validateStartcode, String serverName) throws IOException { 374 InternalScanner scanner = null; 375 Scan scan = null; 376 List<Cell> results = new ArrayList<>(); 377 byte[][][] scanColumns = { COLS, EXPLICIT_COLS }; 378 for (int i = 0; i < scanColumns.length; i++) { 379 try { 380 scan = new Scan().withStartRow(FIRST_ROW); 381 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) { 382 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]); 383 } 384 scanner = region.getScanner(scan); 385 while (scanner.next(results)) { 386 assertTrue( 387 hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 388 byte[] val = CellUtil.cloneValue( 389 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 390 validateRegionInfo(val); 391 if (validateStartcode) { 392 // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 393 // HConstants.STARTCODE_QUALIFIER)); 394 // val = getColumn(results, HConstants.CATALOG_FAMILY, 395 // HConstants.STARTCODE_QUALIFIER).getValue(); 396 assertNotNull(val); 397 assertFalse(val.length == 0); 398 long startCode = Bytes.toLong(val); 399 assertEquals(START_CODE, startCode); 400 } 401 402 if (serverName != null) { 403 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 404 val = CellUtil.cloneValue( 405 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 406 assertNotNull(val); 407 assertFalse(val.length == 0); 408 String server = Bytes.toString(val); 409 assertEquals(0, server.compareTo(serverName)); 410 } 411 } 412 } finally { 413 InternalScanner s = scanner; 414 scanner = null; 415 if (s != null) { 416 s.close(); 417 } 418 } 419 } 420 } 421 422 private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 423 for (Cell kv : kvs) { 424 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 425 return true; 426 } 427 } 428 return false; 429 } 430 431 private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 432 for (Cell kv : kvs) { 433 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 434 return kv; 435 } 436 } 437 return null; 438 } 439 440 /** Use get to retrieve the HRegionInfo and validate it */ 441 private void getRegionInfo(Table table) throws IOException { 442 Get get = new Get(ROW_KEY); 443 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 444 Result result = table.get(get); 445 byte[] bytes = result.value(); 446 validateRegionInfo(bytes); 447 } 448 449 /** 450 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update 451 * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910. 452 */ 453 @Test 454 public void testScanAndSyncFlush() throws Exception { 455 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 456 Table hri = new RegionAsTable(region); 457 try { 458 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 459 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 460 int count = count(hri, -1, false); 461 assertEquals(count, count(hri, 100, false)); // do a sync flush. 462 } catch (Exception e) { 463 LOG.error("Failed", e); 464 throw e; 465 } finally { 466 HBaseTestingUtil.closeRegionAndWAL(this.region); 467 } 468 } 469 470 /** 471 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the 472 * StoreScanner update readers and the transition from memstore -> snapshot -> store file. 473 */ 474 @Test 475 public void testScanAndRealConcurrentFlush() throws Exception { 476 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 477 Table hri = new RegionAsTable(region); 478 try { 479 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 480 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 481 int count = count(hri, -1, false); 482 assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush 483 } catch (Exception e) { 484 LOG.error("Failed", e); 485 throw e; 486 } finally { 487 HBaseTestingUtil.closeRegionAndWAL(this.region); 488 } 489 } 490 491 /** 492 * Make sure scanner returns correct result when we run a major compaction with deletes. 493 */ 494 @Test 495 public void testScanAndConcurrentMajorCompact() throws Exception { 496 TableDescriptor htd = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()), 497 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 498 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 499 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 500 Table hri = new RegionAsTable(region); 501 502 try { 503 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, 504 secondRowBytes); 505 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, 506 secondRowBytes); 507 508 Delete dc = new Delete(firstRowBytes); 509 /* delete column1 of firstRow */ 510 dc.addColumns(fam1, col1); 511 region.delete(dc); 512 region.flush(true); 513 514 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes, 515 thirdRowBytes); 516 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes, 517 thirdRowBytes); 518 region.flush(true); 519 520 InternalScanner s = region.getScanner(new Scan()); 521 // run a major compact, column1 of firstRow will be cleaned. 522 region.compact(true); 523 524 List<Cell> results = new ArrayList<>(); 525 s.next(results); 526 527 // make sure returns column2 of firstRow 528 assertTrue("result is not correct, keyValues : " + results, results.size() == 1); 529 assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 530 assertTrue(CellUtil.matchingFamily(results.get(0), fam2)); 531 532 results = new ArrayList<>(); 533 s.next(results); 534 535 // get secondRow 536 assertTrue(results.size() == 2); 537 assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes)); 538 assertTrue(CellUtil.matchingFamily(results.get(0), fam1)); 539 assertTrue(CellUtil.matchingFamily(results.get(1), fam2)); 540 } finally { 541 HBaseTestingUtil.closeRegionAndWAL(this.region); 542 } 543 } 544 545 /** 546 * Count table. 547 * @param hri Region 548 * @param flushIndex At what row we start the flush. 549 * @param concurrent if the flush should be concurrent or sync. 550 * @return Count of rows found. 551 */ 552 private int count(final Table countTable, final int flushIndex, boolean concurrent) 553 throws Exception { 554 LOG.info("Taking out counting scan"); 555 Scan scan = new Scan(); 556 for (byte[] qualifier : EXPLICIT_COLS) { 557 scan.addColumn(HConstants.CATALOG_FAMILY, qualifier); 558 } 559 ResultScanner s = countTable.getScanner(scan); 560 int count = 0; 561 boolean justFlushed = false; 562 while (s.next() != null) { 563 if (justFlushed) { 564 LOG.info("after next() just after next flush"); 565 justFlushed = false; 566 } 567 count++; 568 if (flushIndex == count) { 569 LOG.info("Starting flush at flush index " + flushIndex); 570 Thread t = new Thread() { 571 @Override 572 public void run() { 573 try { 574 region.flush(true); 575 LOG.info("Finishing flush"); 576 } catch (IOException e) { 577 LOG.info("Failed flush cache"); 578 } 579 } 580 }; 581 t.start(); 582 if (!concurrent) { 583 // sync flush 584 t.join(); 585 } 586 LOG.info("Continuing on after kicking off background flush"); 587 justFlushed = true; 588 } 589 } 590 s.close(); 591 LOG.info("Found " + count + " items"); 592 return count; 593 } 594}