001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; 021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; 022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; 024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; 025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; 026import static org.junit.Assert.assertEquals; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.List; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HDFSBlocksDistribution; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Scan.ReadType; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; 049import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.RegionSplitter; 055import org.apache.hadoop.io.NullWritable; 056import org.apache.hadoop.mapreduce.InputSplit; 057import org.apache.hadoop.mapreduce.Job; 058import org.apache.hadoop.mapreduce.RecordReader; 059import org.apache.hadoop.mapreduce.Reducer; 060import org.apache.hadoop.mapreduce.TaskAttemptContext; 061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 062import org.junit.Assert; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 072 073@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 074public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class); 081 082 private static final byte[] bbb = Bytes.toBytes("bbb"); 083 private static final byte[] yyy = Bytes.toBytes("yyy"); 084 private static final byte[] bbc = Bytes.toBytes("bbc"); 085 private static final byte[] yya = Bytes.toBytes("yya"); 086 087 @Rule 088 public TestName name = new TestName(); 089 090 @Override 091 protected byte[] getStartRow() { 092 return bbb; 093 } 094 095 @Override 096 protected byte[] getEndRow() { 097 return yyy; 098 } 099 100 @Test 101 public void testGetBestLocations() throws IOException { 102 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); 103 Configuration conf = UTIL.getConfiguration(); 104 105 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); 106 Assert.assertEquals(null, 107 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 108 109 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 110 Assert.assertEquals(Lists.newArrayList("h1"), 111 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 112 113 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 114 Assert.assertEquals(Lists.newArrayList("h1"), 115 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 116 117 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1); 118 Assert.assertEquals(Lists.newArrayList("h1"), 119 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 120 121 blockDistribution = new HDFSBlocksDistribution(); 122 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10); 123 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7); 124 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5); 125 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1); 126 Assert.assertEquals(Lists.newArrayList("h1"), 127 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 128 129 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2); 130 Assert.assertEquals(Lists.newArrayList("h1", "h2"), 131 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 132 133 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3); 134 Assert.assertEquals(Lists.newArrayList("h2", "h1"), 135 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 136 137 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6); 138 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9); 139 140 Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"), 141 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 142 } 143 144 public static enum TestTableSnapshotCounters { 145 VALIDATION_ERROR 146 } 147 148 public static class TestTableSnapshotMapper 149 extends TableMapper<ImmutableBytesWritable, NullWritable> { 150 @Override 151 protected void map(ImmutableBytesWritable key, Result value, Context context) 152 throws IOException, InterruptedException { 153 // Validate a single row coming from the snapshot, and emit the row key 154 verifyRowFromMap(key, value); 155 context.write(key, NullWritable.get()); 156 } 157 } 158 159 public static class TestTableSnapshotReducer 160 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 161 HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(bbb, yyy); 162 163 @Override 164 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, 165 Context context) throws IOException, InterruptedException { 166 rowTracker.addRow(key.get()); 167 } 168 169 @Override 170 protected void cleanup(Context context) throws IOException, InterruptedException { 171 rowTracker.validate(); 172 } 173 } 174 175 @Test 176 public void testInitTableSnapshotMapperJobConfig() throws Exception { 177 final TableName tableName = TableName.valueOf(name.getMethodName()); 178 String snapshotName = "foo"; 179 180 try { 181 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 182 Job job = new Job(UTIL.getConfiguration()); 183 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 184 185 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 186 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 187 tmpTableDir); 188 189 // TODO: would be better to examine directly the cache instance that results from this 190 // config. Currently this is not possible because BlockCache initialization is static. 191 Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.", 192 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 193 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 194 Assert.assertEquals("Snapshot job should not use BucketCache.", 0, 195 job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01); 196 } finally { 197 UTIL.getAdmin().deleteSnapshot(snapshotName); 198 UTIL.deleteTable(tableName); 199 } 200 } 201 202 @Test 203 public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception { 204 Configuration conf = UTIL.getConfiguration(); 205 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true); 206 try { 207 testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1, true); 208 } finally { 209 conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION); 210 } 211 } 212 213 @Override 214 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 215 String snapshotName, Path tmpTableDir) throws Exception { 216 Job job = new Job(UTIL.getConfiguration()); 217 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 218 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 219 tmpTableDir); 220 } 221 222 @Override 223 public void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions, 224 int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception { 225 final TableName tableName = TableName.valueOf(name.getMethodName()); 226 try { 227 createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 228 229 Configuration conf = util.getConfiguration(); 230 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); 231 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 232 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 233 Job job = new Job(conf); 234 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 235 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan 236 237 if (numSplitsPerRegion > 1) { 238 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 239 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 240 false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 241 } else { 242 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 243 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 244 false, tmpTableDir); 245 } 246 247 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 248 249 } finally { 250 util.getAdmin().deleteSnapshot(snapshotName); 251 util.deleteTable(tableName); 252 } 253 } 254 255 @Test 256 public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { 257 String snapshotName = "testWithMockedMapReduceMultiRegion"; 258 final TableName tableName = TableName.valueOf(name.getMethodName()); 259 try { 260 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 261 262 Configuration conf = UTIL.getConfiguration(); 263 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 264 Job job = new Job(conf); 265 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 266 // test scan with startRow and stopRow 267 Scan scan = new Scan().withStartRow(bbc).withStopRow(yya); 268 269 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 270 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 271 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 272 273 verifyWithMockedMapReduce(job, 10, 40, bbc, yya); 274 } finally { 275 UTIL.getAdmin().deleteSnapshot(snapshotName); 276 UTIL.deleteTable(tableName); 277 } 278 } 279 280 @Test 281 public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { 282 String snapshotName = "testWithMockedMapReduceMultiRegion"; 283 final TableName tableName = TableName.valueOf(name.getMethodName()); 284 try { 285 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 286 287 Configuration conf = UTIL.getConfiguration(); 288 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 289 Job job = new Job(conf); 290 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 291 // test scan without startRow and stopRow 292 Scan scan2 = new Scan(); 293 294 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, 295 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 296 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 297 298 verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, 299 HConstants.EMPTY_START_ROW); 300 301 } finally { 302 UTIL.getAdmin().deleteSnapshot(snapshotName); 303 UTIL.deleteTable(tableName); 304 } 305 } 306 307 @Test 308 public void testScanLimit() throws Exception { 309 final TableName tableName = TableName.valueOf(name.getMethodName()); 310 final String snapshotName = tableName + "Snapshot"; 311 Table table = null; 312 try { 313 UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10); 314 if (UTIL.getAdmin().tableExists(tableName)) { 315 UTIL.deleteTable(tableName); 316 } 317 318 UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy }); 319 320 Admin admin = UTIL.getAdmin(); 321 322 int regionNum = admin.getRegions(tableName).size(); 323 // put some stuff in the table 324 table = UTIL.getConnection().getTable(tableName); 325 UTIL.loadTable(table, FAMILIES); 326 327 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 328 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 329 330 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 331 null, snapshotName, rootDir, fs, true); 332 333 Job job = new Job(UTIL.getConfiguration()); 334 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 335 Scan scan = new Scan(); 336 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 337 TestTableSnapshotInputFormat.class); 338 339 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 340 RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true, 341 tmpTableDir); 342 Assert.assertTrue(job.waitForCompletion(true)); 343 Assert.assertEquals(10 * regionNum, 344 job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue()); 345 } finally { 346 if (table != null) { 347 table.close(); 348 } 349 UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT); 350 UTIL.getAdmin().deleteSnapshot(snapshotName); 351 UTIL.deleteTable(tableName); 352 } 353 } 354 355 @Test 356 public void testNoDuplicateResultsWhenSplitting() throws Exception { 357 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 358 String snapshotName = "testSnapshotBug"; 359 try { 360 if (UTIL.getAdmin().tableExists(tableName)) { 361 UTIL.deleteTable(tableName); 362 } 363 364 UTIL.createTable(tableName, FAMILIES); 365 Admin admin = UTIL.getAdmin(); 366 367 // put some stuff in the table 368 Table table = UTIL.getConnection().getTable(tableName); 369 UTIL.loadTable(table, FAMILIES); 370 371 // split to 2 regions 372 admin.split(tableName, Bytes.toBytes("eee")); 373 TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2); 374 375 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 376 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 377 378 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 379 null, snapshotName, rootDir, fs, true); 380 381 // load different values 382 byte[] value = Bytes.toBytes("after_snapshot_value"); 383 UTIL.loadTable(table, FAMILIES, value); 384 385 // cause flush to create new files in the region 386 admin.flush(tableName); 387 table.close(); 388 389 Job job = new Job(UTIL.getConfiguration()); 390 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 391 // limit the scan 392 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 393 394 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 395 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 396 tmpTableDir); 397 398 verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); 399 } finally { 400 UTIL.getAdmin().deleteSnapshot(snapshotName); 401 UTIL.deleteTable(tableName); 402 } 403 } 404 405 @Test 406 public void testScannerReadTypeConfiguration() throws IOException { 407 Configuration conf = new Configuration(false); 408 // Explicitly set ReadTypes should persist 409 for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { 410 Scan scanWithReadType = new Scan(); 411 scanWithReadType.setReadType(readType); 412 assertEquals(scanWithReadType.getReadType(), 413 serializeAndReturn(conf, scanWithReadType).getReadType()); 414 } 415 // We should only see the DEFAULT ReadType getting updated to STREAM. 416 Scan scanWithoutReadType = new Scan(); 417 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 418 assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 419 420 // We should still be able to force a certain ReadType when DEFAULT is given. 421 conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); 422 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 423 assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 424 } 425 426 /** 427 * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat 428 * does. 429 */ 430 private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { 431 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); 432 return TableSnapshotInputFormatImpl.extractScanFromConf(conf); 433 } 434 435 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, 436 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 437 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 438 List<InputSplit> splits = tsif.getSplits(job); 439 440 Assert.assertEquals(expectedNumSplits, splits.size()); 441 442 HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(startRow, 443 stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); 444 445 boolean localityEnabled = job.getConfiguration().getBoolean( 446 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 447 448 boolean byRegionLoc = 449 job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 450 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 451 for (int i = 0; i < splits.size(); i++) { 452 // validate input split 453 InputSplit split = splits.get(i); 454 Assert.assertTrue(split instanceof TableSnapshotRegionSplit); 455 TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; 456 if (localityEnabled) { 457 Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 458 if (byRegionLoc) { 459 // When it uses region location from meta, the hostname will be "localhost", 460 // the location from hdfs block location is "127.0.0.1". 461 Assert.assertEquals(1, split.getLocations().length); 462 Assert.assertTrue("Not using region location!", 463 split.getLocations()[0].equals("localhost")); 464 } else { 465 Assert.assertTrue("Not using region location!", 466 split.getLocations()[0].equals("127.0.0.1")); 467 } 468 } else { 469 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 470 } 471 472 Scan scan = 473 TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); 474 if (startRow.length > 0) { 475 Assert.assertTrue( 476 Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()), 477 Bytes.compareTo(startRow, scan.getStartRow()) <= 0); 478 } 479 if (stopRow.length > 0) { 480 Assert.assertTrue( 481 Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()), 482 Bytes.compareTo(stopRow, scan.getStopRow()) >= 0); 483 } 484 Assert.assertTrue("startRow should < stopRow", 485 Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0); 486 487 // validate record reader 488 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); 489 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); 490 RecordReader<ImmutableBytesWritable, Result> rr = 491 tsif.createRecordReader(split, taskAttemptContext); 492 rr.initialize(split, taskAttemptContext); 493 494 // validate we can read all the data back 495 while (rr.nextKeyValue()) { 496 byte[] row = rr.getCurrentKey().get(); 497 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); 498 rowTracker.addRow(row); 499 } 500 501 rr.close(); 502 } 503 504 // validate all rows are seen 505 rowTracker.validate(); 506 } 507 508 @Override 509 protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName, 510 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 511 int expectedNumSplits, boolean shutdownCluster) throws Exception { 512 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 513 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 514 } 515 516 // this is also called by the IntegrationTestTableSnapshotInputFormat 517 public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName, 518 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 519 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 520 521 LOG.info("testing with MapReduce"); 522 523 LOG.info("create the table and snapshot"); 524 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 525 526 if (shutdownCluster) { 527 LOG.info("shutting down hbase cluster."); 528 util.shutdownMiniHBaseCluster(); 529 } 530 531 try { 532 // create the job 533 Job job = new Job(util.getConfiguration()); 534 Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); // limit the scan 535 536 job.setJarByClass(util.getClass()); 537 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 538 TestTableSnapshotInputFormat.class); 539 540 if (numSplitsPerRegion > 1) { 541 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 542 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 543 true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 544 } else { 545 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 546 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 547 true, tableDir); 548 } 549 550 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 551 job.setNumReduceTasks(1); 552 job.setOutputFormatClass(NullOutputFormat.class); 553 554 Assert.assertTrue(job.waitForCompletion(true)); 555 } finally { 556 if (!shutdownCluster) { 557 util.getAdmin().deleteSnapshot(snapshotName); 558 util.deleteTable(tableName); 559 } 560 } 561 } 562 563 @Test 564 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 565 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 566 } 567 568 @Test 569 public void testCleanRestoreDir() throws Exception { 570 TableName tableName = TableName.valueOf("test_table"); 571 String snapshotName = "test_snapshot"; 572 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 573 Job job = Job.getInstance(UTIL.getConfiguration()); 574 Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName); 575 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 576 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 577 workingDir); 578 FileSystem fs = workingDir.getFileSystem(job.getConfiguration()); 579 Path restorePath = 580 new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir")); 581 Assert.assertTrue(fs.exists(restorePath)); 582 TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName); 583 Assert.assertFalse(fs.exists(restorePath)); 584 } 585}