001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HColumnDescriptor; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionInfo; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.HTestConst; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.UnknownScannerException; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.filter.Filter; 051import org.apache.hadoop.hbase.filter.InclusiveStopFilter; 052import org.apache.hadoop.hbase.filter.PrefixFilter; 053import org.apache.hadoop.hbase.filter.WhileMatchFilter; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * Test of a long-lived scanner validating as we go. 068 */ 069@Category({ RegionServerTests.class, MediumTests.class }) 070public class TestScanner { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestScanner.class); 075 076 @Rule 077 public TestName name = new TestName(); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class); 080 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 081 082 private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW; 083 private static final byte[][] COLS = { HConstants.CATALOG_FAMILY }; 084 private static final byte[][] EXPLICIT_COLS = 085 { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER, 086 // TODO ryan 087 // HConstants.STARTCODE_QUALIFIER 088 }; 089 090 static final HTableDescriptor TESTTABLEDESC = 091 new HTableDescriptor(TableName.valueOf("testscanner")); 092 static { 093 TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY) 094 // Ten is an arbitrary number. Keep versions to help debugging. 095 .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024)); 096 } 097 /** HRegionInfo for root region */ 098 public static final HRegionInfo REGION_INFO = new HRegionInfo(TESTTABLEDESC.getTableName(), 099 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); 100 101 private static final byte[] ROW_KEY = REGION_INFO.getRegionName(); 102 103 private static final long START_CODE = Long.MAX_VALUE; 104 105 private HRegion region; 106 107 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; 108 final private byte[] col1; 109 110 public TestScanner() { 111 super(); 112 113 firstRowBytes = START_KEY_BYTES; 114 secondRowBytes = START_KEY_BYTES.clone(); 115 // Increment the least significant character so we get to next row. 116 secondRowBytes[START_KEY_BYTES.length - 1]++; 117 thirdRowBytes = START_KEY_BYTES.clone(); 118 thirdRowBytes[START_KEY_BYTES.length - 1] = 119 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 120 col1 = Bytes.toBytes("column1"); 121 } 122 123 /** 124 * Test basic stop row filter works. 125 */ 126 @Test 127 public void testStopRow() throws Exception { 128 byte[] startrow = Bytes.toBytes("bbb"); 129 byte[] stoprow = Bytes.toBytes("ccc"); 130 try { 131 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 132 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 133 List<Cell> results = new ArrayList<>(); 134 // Do simple test of getting one row only first. 135 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd")); 136 scan.addFamily(HConstants.CATALOG_FAMILY); 137 138 InternalScanner s = region.getScanner(scan); 139 int count = 0; 140 while (s.next(results)) { 141 count++; 142 } 143 s.close(); 144 assertEquals(0, count); 145 // Now do something a bit more imvolved. 146 scan = new Scan(startrow, stoprow); 147 scan.addFamily(HConstants.CATALOG_FAMILY); 148 149 s = region.getScanner(scan); 150 count = 0; 151 Cell kv = null; 152 results = new ArrayList<>(); 153 for (boolean first = true; s.next(results);) { 154 kv = results.get(0); 155 if (first) { 156 assertTrue(CellUtil.matchingRows(kv, startrow)); 157 first = false; 158 } 159 count++; 160 } 161 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0); 162 // We got something back. 163 assertTrue(count > 10); 164 s.close(); 165 } finally { 166 HBaseTestingUtility.closeRegionAndWAL(this.region); 167 } 168 } 169 170 void rowPrefixFilter(Scan scan) throws IOException { 171 List<Cell> results = new ArrayList<>(); 172 scan.addFamily(HConstants.CATALOG_FAMILY); 173 InternalScanner s = region.getScanner(scan); 174 boolean hasMore = true; 175 while (hasMore) { 176 hasMore = s.next(results); 177 for (Cell kv : results) { 178 assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]); 179 assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]); 180 } 181 results.clear(); 182 } 183 s.close(); 184 } 185 186 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException { 187 List<Cell> results = new ArrayList<>(); 188 scan.addFamily(HConstants.CATALOG_FAMILY); 189 InternalScanner s = region.getScanner(scan); 190 boolean hasMore = true; 191 while (hasMore) { 192 hasMore = s.next(results); 193 for (Cell kv : results) { 194 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0); 195 } 196 results.clear(); 197 } 198 s.close(); 199 } 200 201 @Test 202 public void testFilters() throws IOException { 203 try { 204 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 205 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 206 byte[] prefix = Bytes.toBytes("ab"); 207 Filter newFilter = new PrefixFilter(prefix); 208 Scan scan = new Scan(); 209 scan.setFilter(newFilter); 210 rowPrefixFilter(scan); 211 212 byte[] stopRow = Bytes.toBytes("bbc"); 213 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow)); 214 scan = new Scan(); 215 scan.setFilter(newFilter); 216 rowInclusiveStopFilter(scan, stopRow); 217 218 } finally { 219 HBaseTestingUtility.closeRegionAndWAL(this.region); 220 } 221 } 222 223 /** 224 * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a 225 * UnknownScannerException. HBASE-2503 226 */ 227 @Test 228 public void testRaceBetweenClientAndTimeout() throws Exception { 229 try { 230 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 231 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 232 Scan scan = new Scan(); 233 InternalScanner s = region.getScanner(scan); 234 List<Cell> results = new ArrayList<>(); 235 try { 236 s.next(results); 237 s.close(); 238 s.next(results); 239 fail("We don't want anything more, we should be failing"); 240 } catch (UnknownScannerException ex) { 241 // ok! 242 return; 243 } 244 } finally { 245 HBaseTestingUtility.closeRegionAndWAL(this.region); 246 } 247 } 248 249 /** 250 * The test! 251 */ 252 @Test 253 public void testScanner() throws IOException { 254 try { 255 region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 256 Table table = new RegionAsTable(region); 257 258 // Write information to the meta table 259 260 Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 261 262 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 263 REGION_INFO.toByteArray()); 264 table.put(put); 265 266 // What we just committed is in the memstore. Verify that we can get 267 // it back both with scanning and get 268 269 scan(false, null); 270 getRegionInfo(table); 271 272 // Close and re-open 273 274 ((HRegion) region).close(); 275 region = HRegion.openHRegion(region, null); 276 table = new RegionAsTable(region); 277 278 // Verify we can get the data back now that it is on disk. 279 280 scan(false, null); 281 getRegionInfo(table); 282 283 // Store some new information 284 285 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort(); 286 287 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 288 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 289 290 // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE)); 291 292 table.put(put); 293 294 // Validate that we can still get the HRegionInfo, even though it is in 295 // an older row on disk and there is a newer row in the memstore 296 297 scan(true, address.toString()); 298 getRegionInfo(table); 299 300 // flush cache 301 this.region.flush(true); 302 303 // Validate again 304 305 scan(true, address.toString()); 306 getRegionInfo(table); 307 308 // Close and reopen 309 310 ((HRegion) region).close(); 311 region = HRegion.openHRegion(region, null); 312 table = new RegionAsTable(region); 313 314 // Validate again 315 316 scan(true, address.toString()); 317 getRegionInfo(table); 318 319 // Now update the information again 320 321 address = "bar.foo.com:4321"; 322 323 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 324 325 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 326 table.put(put); 327 328 // Validate again 329 330 scan(true, address.toString()); 331 getRegionInfo(table); 332 333 // flush cache 334 335 region.flush(true); 336 337 // Validate again 338 339 scan(true, address.toString()); 340 getRegionInfo(table); 341 342 // Close and reopen 343 344 ((HRegion) this.region).close(); 345 this.region = HRegion.openHRegion(region, null); 346 table = new RegionAsTable(this.region); 347 348 // Validate again 349 350 scan(true, address.toString()); 351 getRegionInfo(table); 352 353 } finally { 354 // clean up 355 HBaseTestingUtility.closeRegionAndWAL(this.region); 356 } 357 } 358 359 /** Compare the HRegionInfo we read from HBase to what we stored */ 360 private void validateRegionInfo(byte[] regionBytes) throws IOException { 361 HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes); 362 363 assertEquals(REGION_INFO.getRegionId(), info.getRegionId()); 364 assertEquals(0, info.getStartKey().length); 365 assertEquals(0, info.getEndKey().length); 366 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName())); 367 // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc())); 368 } 369 370 /** Use a scanner to get the region info and then validate the results */ 371 private void scan(boolean validateStartcode, String serverName) throws IOException { 372 InternalScanner scanner = null; 373 Scan scan = null; 374 List<Cell> results = new ArrayList<>(); 375 byte[][][] scanColumns = { COLS, EXPLICIT_COLS }; 376 for (int i = 0; i < scanColumns.length; i++) { 377 try { 378 scan = new Scan(FIRST_ROW); 379 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) { 380 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]); 381 } 382 scanner = region.getScanner(scan); 383 while (scanner.next(results)) { 384 assertTrue( 385 hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 386 byte[] val = CellUtil.cloneValue( 387 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 388 validateRegionInfo(val); 389 if (validateStartcode) { 390 // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 391 // HConstants.STARTCODE_QUALIFIER)); 392 // val = getColumn(results, HConstants.CATALOG_FAMILY, 393 // HConstants.STARTCODE_QUALIFIER).getValue(); 394 assertNotNull(val); 395 assertFalse(val.length == 0); 396 long startCode = Bytes.toLong(val); 397 assertEquals(START_CODE, startCode); 398 } 399 400 if (serverName != null) { 401 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 402 val = CellUtil.cloneValue( 403 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 404 assertNotNull(val); 405 assertFalse(val.length == 0); 406 String server = Bytes.toString(val); 407 assertEquals(0, server.compareTo(serverName)); 408 } 409 } 410 } finally { 411 InternalScanner s = scanner; 412 scanner = null; 413 if (s != null) { 414 s.close(); 415 } 416 } 417 } 418 } 419 420 private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 421 for (Cell kv : kvs) { 422 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 423 return true; 424 } 425 } 426 return false; 427 } 428 429 private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 430 for (Cell kv : kvs) { 431 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 432 return kv; 433 } 434 } 435 return null; 436 } 437 438 /** Use get to retrieve the HRegionInfo and validate it */ 439 private void getRegionInfo(Table table) throws IOException { 440 Get get = new Get(ROW_KEY); 441 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 442 Result result = table.get(get); 443 byte[] bytes = result.value(); 444 validateRegionInfo(bytes); 445 } 446 447 /** 448 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update 449 * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910. 450 */ 451 @Test 452 public void testScanAndSyncFlush() throws Exception { 453 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 454 Table hri = new RegionAsTable(region); 455 try { 456 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 457 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 458 int count = count(hri, -1, false); 459 assertEquals(count, count(hri, 100, false)); // do a sync flush. 460 } catch (Exception e) { 461 LOG.error("Failed", e); 462 throw e; 463 } finally { 464 HBaseTestingUtility.closeRegionAndWAL(this.region); 465 } 466 } 467 468 /** 469 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the 470 * StoreScanner update readers and the transition from memstore -> snapshot -> store file. 471 */ 472 @Test 473 public void testScanAndRealConcurrentFlush() throws Exception { 474 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 475 Table hri = new RegionAsTable(region); 476 try { 477 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 478 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 479 int count = count(hri, -1, false); 480 assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush 481 } catch (Exception e) { 482 LOG.error("Failed", e); 483 throw e; 484 } finally { 485 HBaseTestingUtility.closeRegionAndWAL(this.region); 486 } 487 } 488 489 /** 490 * Make sure scanner returns correct result when we run a major compaction with deletes. 491 */ 492 @Test 493 @SuppressWarnings("deprecation") 494 public void testScanAndConcurrentMajorCompact() throws Exception { 495 HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName()); 496 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 497 Table hri = new RegionAsTable(region); 498 499 try { 500 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, 501 secondRowBytes); 502 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, 503 secondRowBytes); 504 505 Delete dc = new Delete(firstRowBytes); 506 /* delete column1 of firstRow */ 507 dc.addColumns(fam1, col1); 508 region.delete(dc); 509 region.flush(true); 510 511 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes, 512 thirdRowBytes); 513 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes, 514 thirdRowBytes); 515 region.flush(true); 516 517 InternalScanner s = region.getScanner(new Scan()); 518 // run a major compact, column1 of firstRow will be cleaned. 519 region.compact(true); 520 521 List<Cell> results = new ArrayList<>(); 522 s.next(results); 523 524 // make sure returns column2 of firstRow 525 assertTrue("result is not correct, keyValues : " + results, results.size() == 1); 526 assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 527 assertTrue(CellUtil.matchingFamily(results.get(0), fam2)); 528 529 results = new ArrayList<>(); 530 s.next(results); 531 532 // get secondRow 533 assertTrue(results.size() == 2); 534 assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes)); 535 assertTrue(CellUtil.matchingFamily(results.get(0), fam1)); 536 assertTrue(CellUtil.matchingFamily(results.get(1), fam2)); 537 } finally { 538 HBaseTestingUtility.closeRegionAndWAL(this.region); 539 } 540 } 541 542 /** 543 * Count table. 544 * @param hri Region 545 * @param flushIndex At what row we start the flush. 546 * @param concurrent if the flush should be concurrent or sync. 547 * @return Count of rows found. 548 */ 549 private int count(final Table countTable, final int flushIndex, boolean concurrent) 550 throws Exception { 551 LOG.info("Taking out counting scan"); 552 Scan scan = new Scan(); 553 for (byte[] qualifier : EXPLICIT_COLS) { 554 scan.addColumn(HConstants.CATALOG_FAMILY, qualifier); 555 } 556 ResultScanner s = countTable.getScanner(scan); 557 int count = 0; 558 boolean justFlushed = false; 559 while (s.next() != null) { 560 if (justFlushed) { 561 LOG.info("after next() just after next flush"); 562 justFlushed = false; 563 } 564 count++; 565 if (flushIndex == count) { 566 LOG.info("Starting flush at flush index " + flushIndex); 567 Thread t = new Thread() { 568 @Override 569 public void run() { 570 try { 571 region.flush(true); 572 LOG.info("Finishing flush"); 573 } catch (IOException e) { 574 LOG.info("Failed flush cache"); 575 } 576 } 577 }; 578 t.start(); 579 if (!concurrent) { 580 // sync flush 581 t.join(); 582 } 583 LOG.info("Continuing on after kicking off background flush"); 584 justFlushed = true; 585 } 586 } 587 s.close(); 588 LOG.info("Found " + count + " items"); 589 return count; 590 } 591}