001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.stream.Collectors; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.StartTestingClusterOption; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 035import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 036import org.apache.hadoop.hbase.regionserver.HRegion; 037import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.regionserver.StoreContext; 040import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 041import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 042import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 043import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 044import org.apache.hadoop.hbase.testclassification.ClientTests; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.hbase.util.HFileArchiveUtil; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.junit.After; 053import org.junit.Assert; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ LargeTests.class, ClientTests.class }) 064public class TestTableSnapshotScanner { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 071 private final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 072 private static final int NUM_REGION_SERVERS = 2; 073 private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") }; 074 public static byte[] bbb = Bytes.toBytes("bbb"); 075 public static byte[] yyy = Bytes.toBytes("yyy"); 076 077 private FileSystem fs; 078 private Path rootDir; 079 private boolean clusterUp; 080 081 @Rule 082 public TestName name = new TestName(); 083 084 public static void blockUntilSplitFinished(HBaseTestingUtil util, TableName tableName, 085 int expectedRegionSize) throws Exception { 086 for (int i = 0; i < 100; i++) { 087 List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName); 088 if (hRegionInfoList.size() >= expectedRegionSize) { 089 break; 090 } 091 Thread.sleep(1000); 092 } 093 } 094 095 @Before 096 public void setupCluster() throws Exception { 097 setupConf(UTIL.getConfiguration()); 098 StartTestingClusterOption option = 099 StartTestingClusterOption.builder().numRegionServers(NUM_REGION_SERVERS) 100 .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build(); 101 UTIL.startMiniCluster(option); 102 clusterUp = true; 103 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 104 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 105 } 106 107 @After 108 public void tearDownCluster() throws Exception { 109 if (clusterUp) { 110 UTIL.shutdownMiniCluster(); 111 } 112 } 113 114 protected void setupConf(Configuration conf) { 115 // Enable snapshot 116 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 117 } 118 119 public static void createTableAndSnapshot(HBaseTestingUtil util, TableName tableName, 120 String snapshotName, int numRegions) throws Exception { 121 try { 122 util.deleteTable(tableName); 123 } catch (Exception ex) { 124 // ignore 125 } 126 127 if (numRegions > 1) { 128 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 129 } else { 130 util.createTable(tableName, FAMILIES); 131 } 132 Admin admin = util.getAdmin(); 133 134 // put some stuff in the table 135 Table table = util.getConnection().getTable(tableName); 136 util.loadTable(table, FAMILIES); 137 138 Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration()); 139 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 140 141 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null, 142 snapshotName, rootDir, fs, true); 143 144 // load different values 145 byte[] value = Bytes.toBytes("after_snapshot_value"); 146 util.loadTable(table, FAMILIES, value); 147 148 // cause flush to create new files in the region 149 admin.flush(tableName); 150 table.close(); 151 } 152 153 @Test 154 public void testNoDuplicateResultsWhenSplitting() throws Exception { 155 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 156 String snapshotName = "testSnapshotBug"; 157 try { 158 if (UTIL.getAdmin().tableExists(tableName)) { 159 UTIL.deleteTable(tableName); 160 } 161 162 UTIL.createTable(tableName, FAMILIES); 163 Admin admin = UTIL.getAdmin(); 164 165 // put some stuff in the table 166 Table table = UTIL.getConnection().getTable(tableName); 167 UTIL.loadTable(table, FAMILIES); 168 169 // split to 2 regions 170 admin.split(tableName, Bytes.toBytes("eee")); 171 blockUntilSplitFinished(UTIL, tableName, 2); 172 173 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 174 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 175 176 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 177 null, snapshotName, rootDir, fs, true); 178 179 // load different values 180 byte[] value = Bytes.toBytes("after_snapshot_value"); 181 UTIL.loadTable(table, FAMILIES, value); 182 183 // cause flush to create new files in the region 184 admin.flush(tableName); 185 table.close(); 186 187 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 188 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 189 190 TableSnapshotScanner scanner = 191 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 192 193 verifyScanner(scanner, bbb, yyy); 194 scanner.close(); 195 } catch (Exception e) { 196 e.printStackTrace(); 197 } finally { 198 UTIL.getAdmin().deleteSnapshot(snapshotName); 199 UTIL.deleteTable(tableName); 200 } 201 } 202 203 @Test 204 public void testScanLimit() throws Exception { 205 final TableName tableName = TableName.valueOf(name.getMethodName()); 206 final String snapshotName = tableName + "Snapshot"; 207 TableSnapshotScanner scanner = null; 208 try { 209 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 210 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 211 Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan 212 213 scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 214 int count = 0; 215 while (true) { 216 Result result = scanner.next(); 217 if (result == null) { 218 break; 219 } 220 count++; 221 } 222 Assert.assertEquals(100, count); 223 } finally { 224 if (scanner != null) { 225 scanner.close(); 226 } 227 UTIL.getAdmin().deleteSnapshot(snapshotName); 228 UTIL.deleteTable(tableName); 229 } 230 } 231 232 @Test 233 public void testWithSingleRegion() throws Exception { 234 testScanner(UTIL, "testWithSingleRegion", 1, false); 235 } 236 237 @Test 238 public void testWithMultiRegion() throws Exception { 239 testScanner(UTIL, "testWithMultiRegion", 10, false); 240 } 241 242 @Test 243 public void testWithOfflineHBaseMultiRegion() throws Exception { 244 testScanner(UTIL, "testWithMultiRegion", 20, true); 245 } 246 247 @Test 248 public void testScannerWithRestoreScanner() throws Exception { 249 TableName tableName = TableName.valueOf("testScanner"); 250 String snapshotName = "testScannerWithRestoreScanner"; 251 try { 252 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 253 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 254 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 255 256 Configuration conf = UTIL.getConfiguration(); 257 Path rootDir = CommonFSUtils.getRootDir(conf); 258 259 TableSnapshotScanner scanner0 = 260 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 261 verifyScanner(scanner0, bbb, yyy); 262 scanner0.close(); 263 264 // restore snapshot. 265 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 266 267 // scan the snapshot without restoring snapshot 268 TableSnapshotScanner scanner = 269 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 270 verifyScanner(scanner, bbb, yyy); 271 scanner.close(); 272 273 // check whether the snapshot has been deleted by the close of scanner. 274 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 275 verifyScanner(scanner, bbb, yyy); 276 scanner.close(); 277 278 // restore snapshot again. 279 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 280 281 // check whether the snapshot has been deleted by the close of scanner. 282 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 283 verifyScanner(scanner, bbb, yyy); 284 scanner.close(); 285 } finally { 286 UTIL.getAdmin().deleteSnapshot(snapshotName); 287 UTIL.deleteTable(tableName); 288 } 289 } 290 291 private void testScanner(HBaseTestingUtil util, String snapshotName, int numRegions, 292 boolean shutdownCluster) throws Exception { 293 TableName tableName = TableName.valueOf("testScanner"); 294 try { 295 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 296 297 if (shutdownCluster) { 298 util.shutdownMiniHBaseCluster(); 299 clusterUp = false; 300 } 301 302 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 303 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 304 305 TableSnapshotScanner scanner = 306 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 307 308 verifyScanner(scanner, bbb, yyy); 309 scanner.close(); 310 } finally { 311 if (clusterUp) { 312 util.getAdmin().deleteSnapshot(snapshotName); 313 util.deleteTable(tableName); 314 } 315 } 316 } 317 318 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 319 throws IOException, InterruptedException { 320 321 HBaseTestingUtil.SeenRowTracker rowTracker = 322 new HBaseTestingUtil.SeenRowTracker(startRow, stopRow); 323 324 while (true) { 325 Result result = scanner.next(); 326 if (result == null) { 327 break; 328 } 329 verifyRow(result); 330 rowTracker.addRow(result.getRow()); 331 } 332 333 // validate all rows are seen 334 rowTracker.validate(); 335 } 336 337 private static void verifyRow(Result result) throws IOException { 338 byte[] row = result.getRow(); 339 CellScanner scanner = result.cellScanner(); 340 while (scanner.advance()) { 341 Cell cell = scanner.current(); 342 343 // assert that all Cells in the Result have the same key 344 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(), 345 cell.getRowOffset(), cell.getRowLength())); 346 } 347 348 for (int j = 0; j < FAMILIES.length; j++) { 349 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 350 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 351 + " ,actual:" + Bytes.toString(actual), row, actual); 352 } 353 } 354 355 @Test 356 public void testMergeRegion() throws Exception { 357 TableName tableName = TableName.valueOf("testMergeRegion"); 358 String snapshotName = tableName.getNameAsString() + "_snapshot"; 359 Configuration conf = UTIL.getConfiguration(); 360 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 361 long timeout = 20000; // 20s 362 try (Admin admin = UTIL.getAdmin()) { 363 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 364 .collect(Collectors.toList()); 365 // create table with 3 regions 366 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 367 List<RegionInfo> regions = admin.getRegions(tableName); 368 Assert.assertEquals(3, regions.size()); 369 RegionInfo region0 = regions.get(0); 370 RegionInfo region1 = regions.get(1); 371 RegionInfo region2 = regions.get(2); 372 // put some data in the table 373 UTIL.loadTable(table, FAMILIES); 374 admin.flush(tableName); 375 // wait flush is finished 376 UTIL.waitFor(timeout, () -> { 377 try { 378 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 379 for (RegionInfo region : regions) { 380 Path regionDir = new Path(tableDir, region.getEncodedName()); 381 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 382 for (FileStatus fs : fs.listStatus(familyDir)) { 383 if (!fs.getPath().getName().equals(".filelist")) { 384 return true; 385 } 386 } 387 return false; 388 } 389 } 390 return true; 391 } catch (IOException e) { 392 LOG.warn("Failed check if flush is finished", e); 393 return false; 394 } 395 }); 396 // merge 2 regions 397 admin.compactionSwitch(false, serverList); 398 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 399 true); 400 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 401 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 402 RegionInfo mergedRegion = 403 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 404 ? mergedRegions.get(1) 405 : mergedRegions.get(0); 406 // snapshot 407 admin.snapshot(snapshotName, tableName); 408 Assert.assertEquals(1, admin.listSnapshots().size()); 409 // major compact 410 admin.compactionSwitch(true, serverList); 411 admin.majorCompactRegion(mergedRegion.getRegionName()); 412 // wait until merged region has no reference 413 UTIL.waitFor(timeout, () -> { 414 try { 415 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 416 .getRegionServerThreads()) { 417 HRegionServer regionServer = regionServerThread.getRegionServer(); 418 for (HRegion subRegion : regionServer.getRegions(tableName)) { 419 if ( 420 subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName()) 421 ) { 422 regionServer.getCompactedHFilesDischarger().chore(); 423 } 424 } 425 } 426 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 427 HRegionFileSystem regionFs = HRegionFileSystem 428 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 429 boolean references = false; 430 Path regionDir = new Path(tableDir, mergedRegion.getEncodedName()); 431 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 432 StoreContext storeContext = StoreContext.getBuilder() 433 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(familyDir.getName())) 434 .withRegionFileSystem(regionFs).withFamilyStoreDirectoryPath(familyDir).build(); 435 StoreFileTracker sft = 436 StoreFileTrackerFactory.create(UTIL.getConfiguration(), false, storeContext); 437 references = references || sft.hasReferences(); 438 if (references) { 439 break; 440 } 441 } 442 return !references; 443 } catch (IOException e) { 444 LOG.warn("Failed check merged region has no reference", e); 445 return false; 446 } 447 }); 448 // run catalog janitor to clean and wait for parent regions are archived 449 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 450 UTIL.waitFor(timeout, () -> { 451 try { 452 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 453 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 454 String name = fileStatus.getPath().getName(); 455 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 456 return false; 457 } 458 } 459 return true; 460 } catch (IOException e) { 461 LOG.warn("Check if parent regions are archived error", e); 462 return false; 463 } 464 }); 465 // set file modify time and then run cleaner 466 long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 467 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 468 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get(); 469 // scan snapshot 470 try (TableSnapshotScanner scanner = 471 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 472 new Scan().withStartRow(bbb).withStopRow(yyy))) { 473 verifyScanner(scanner, bbb, yyy); 474 } 475 } catch (Exception e) { 476 LOG.error("scan snapshot error", e); 477 Assert.fail("Should not throw Exception: " + e.getMessage()); 478 } 479 } 480 481 @Test 482 public void testDeleteTableWithMergedRegions() throws Exception { 483 final TableName tableName = TableName.valueOf(this.name.getMethodName()); 484 String snapshotName = tableName.getNameAsString() + "_snapshot"; 485 Configuration conf = UTIL.getConfiguration(); 486 try (Admin admin = UTIL.getConnection().getAdmin()) { 487 // disable compaction 488 admin.compactionSwitch(false, 489 admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList())); 490 // create table 491 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 492 List<RegionInfo> regions = admin.getRegions(tableName); 493 Assert.assertEquals(3, regions.size()); 494 // write some data 495 UTIL.loadTable(table, FAMILIES); 496 // merge region 497 admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(), 498 regions.get(1).getEncodedNameAsBytes() }, false).get(); 499 regions = admin.getRegions(tableName); 500 Assert.assertEquals(2, regions.size()); 501 // snapshot 502 admin.snapshot(snapshotName, tableName); 503 // verify snapshot 504 try (TableSnapshotScanner scanner = 505 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 506 new Scan().withStartRow(bbb).withStopRow(yyy))) { 507 verifyScanner(scanner, bbb, yyy); 508 } 509 // drop table 510 admin.disableTable(tableName); 511 admin.deleteTable(tableName); 512 // verify snapshot 513 try (TableSnapshotScanner scanner = 514 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 515 new Scan().withStartRow(bbb).withStopRow(yyy))) { 516 verifyScanner(scanner, bbb, yyy); 517 } 518 } 519 } 520 521 private void traverseAndSetFileTime(Path path, long time) throws IOException { 522 fs.setTimes(path, time, -1); 523 if (fs.isDirectory(path)) { 524 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 525 List<FileStatus> subDirs = 526 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 527 List<FileStatus> files = 528 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 529 for (FileStatus subDir : subDirs) { 530 traverseAndSetFileTime(subDir.getPath(), time); 531 } 532 for (FileStatus file : files) { 533 fs.setTimes(file.getPath(), time, -1); 534 } 535 } 536 } 537}