001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import org.apache.hadoop.conf.Configurable; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileScanner; 054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.io.Text; 059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 060import org.apache.hadoop.mapreduce.Job; 061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.ExpectedException; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 076public class TestImportTsv implements Configurable { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestImportTsv.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class); 083 protected static final String NAME = TestImportTsv.class.getSimpleName(); 084 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 085 086 // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. 087 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 088 089 /** 090 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 091 */ 092 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 093 094 private final String FAMILY = "FAM"; 095 private TableName tn; 096 private Map<String, String> args; 097 098 @Rule 099 public ExpectedException exception = ExpectedException.none(); 100 101 public Configuration getConf() { 102 return util.getConfiguration(); 103 } 104 105 public void setConf(Configuration conf) { 106 throw new IllegalArgumentException("setConf not supported"); 107 } 108 109 @BeforeClass 110 public static void provisionCluster() throws Exception { 111 util.startMiniCluster(); 112 } 113 114 @AfterClass 115 public static void releaseCluster() throws Exception { 116 util.shutdownMiniCluster(); 117 } 118 119 @Before 120 public void setup() throws Exception { 121 tn = TableName.valueOf("test-" + util.getRandomUUID()); 122 args = new HashMap<>(); 123 // Prepare the arguments required for the test. 124 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); 125 args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); 126 } 127 128 @Test 129 public void testMROnTable() throws Exception { 130 util.createTable(tn, FAMILY); 131 doMROnTableTest(null, 1); 132 util.deleteTable(tn); 133 } 134 135 @Test 136 public void testMROnTableWithTimestamp() throws Exception { 137 util.createTable(tn, FAMILY); 138 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 139 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 140 String data = "KEY,1234,VALUE1,VALUE2\n"; 141 142 doMROnTableTest(data, 1); 143 util.deleteTable(tn); 144 } 145 146 @Test 147 public void testMROnTableWithCustomMapper() throws Exception { 148 util.createTable(tn, FAMILY); 149 args.put(ImportTsv.MAPPER_CONF_KEY, 150 "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); 151 152 doMROnTableTest(null, 3); 153 util.deleteTable(tn); 154 } 155 156 @Test 157 public void testBulkOutputWithoutAnExistingTable() throws Exception { 158 // Prepare the arguments required for the test. 159 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 160 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 161 162 doMROnTableTest(null, 3); 163 util.deleteTable(tn); 164 } 165 166 @Test 167 public void testBulkOutputWithAnExistingTable() throws Exception { 168 util.createTable(tn, FAMILY); 169 170 // Prepare the arguments required for the test. 171 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 172 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 173 174 doMROnTableTest(null, 3); 175 util.deleteTable(tn); 176 } 177 178 @Test 179 public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { 180 util.createTable(tn, FAMILY); 181 182 // Prepare the arguments required for the test. 183 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 184 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 185 args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); 186 doMROnTableTest(null, 3); 187 util.deleteTable(tn); 188 } 189 190 @Test 191 public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { 192 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 193 String INPUT_FILE = "InputFile1.csv"; 194 // Prepare the arguments required for the test. 195 String[] args = new String[] { 196 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 197 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 198 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 199 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(), 200 INPUT_FILE }; 201 assertEquals("running test job configuration failed.", 0, 202 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 203 @Override 204 public int run(String[] args) throws Exception { 205 Job job = createSubmittableJob(getConf(), args); 206 assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); 207 assertTrue(job.getReducerClass().equals(TextSortReducer.class)); 208 assertTrue(job.getMapOutputValueClass().equals(Text.class)); 209 return 0; 210 } 211 }, args)); 212 // Delete table created by createSubmittableJob. 213 util.deleteTable(tn); 214 } 215 216 @Test 217 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 218 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 219 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 220 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 221 String data = "KEY\u001bVALUE4\u001bVALUE8\n"; 222 doMROnTableTest(data, 4); 223 util.deleteTable(tn); 224 } 225 226 @Test 227 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 228 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 229 230 Configuration conf = new Configuration(util.getConfiguration()); 231 conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); 232 conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); 233 conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 234 exception.expect(TableNotFoundException.class); 235 assertEquals("running test job configuration failed.", 0, 236 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 237 @Override 238 public int run(String[] args) throws Exception { 239 createSubmittableJob(getConf(), args); 240 return 0; 241 } 242 }, args)); 243 } 244 245 @Test 246 public void testMRNoMatchedColumnFamily() throws Exception { 247 util.createTable(tn, FAMILY); 248 249 String[] args = new String[] { 250 "-D" + ImportTsv.COLUMNS_CONF_KEY 251 + "=HBASE_ROW_KEY,FAM:A,FAM01_ERROR:A,FAM01_ERROR:B,FAM02_ERROR:C", 252 tn.getNameAsString(), "/inputFile" }; 253 exception.expect(NoSuchColumnFamilyException.class); 254 assertEquals("running test job configuration failed.", 0, 255 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 256 @Override 257 public int run(String[] args) throws Exception { 258 createSubmittableJob(getConf(), args); 259 return 0; 260 } 261 }, args)); 262 263 util.deleteTable(tn); 264 } 265 266 @Test 267 public void testMRWithoutAnExistingTable() throws Exception { 268 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 269 270 exception.expect(TableNotFoundException.class); 271 assertEquals("running test job configuration failed.", 0, 272 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 273 @Override 274 public int run(String[] args) throws Exception { 275 createSubmittableJob(getConf(), args); 276 return 0; 277 } 278 }, args)); 279 } 280 281 @Test 282 public void testJobConfigurationsWithDryMode() throws Exception { 283 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 284 String INPUT_FILE = "InputFile1.csv"; 285 // Prepare the arguments required for the test. 286 String[] argsArray = 287 new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 288 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 289 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 290 "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE }; 291 assertEquals("running test job configuration failed.", 0, 292 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 293 @Override 294 public int run(String[] args) throws Exception { 295 Job job = createSubmittableJob(getConf(), args); 296 assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); 297 return 0; 298 } 299 }, argsArray)); 300 // Delete table created by createSubmittableJob. 301 util.deleteTable(tn); 302 } 303 304 @Test 305 public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { 306 util.createTable(tn, FAMILY); 307 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 308 doMROnTableTest(null, 1); 309 // Dry mode should not delete an existing table. If it's not present, 310 // this will throw TableNotFoundException. 311 util.deleteTable(tn); 312 } 313 314 /** 315 * If table is not present in non-bulk mode, dry run should fail just like normal mode. 316 */ 317 @Test 318 public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { 319 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 320 exception.expect(TableNotFoundException.class); 321 doMROnTableTest(null, 1); 322 } 323 324 @Test 325 public void testDryModeWithBulkOutputAndTableExists() throws Exception { 326 util.createTable(tn, FAMILY); 327 // Prepare the arguments required for the test. 328 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 329 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 330 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 331 doMROnTableTest(null, 1); 332 // Dry mode should not delete an existing table. If it's not present, 333 // this will throw TableNotFoundException. 334 util.deleteTable(tn); 335 } 336 337 /** 338 * If table is not present in bulk mode and create.table is not set to yes, import should fail 339 * with TableNotFoundException. 340 */ 341 @Test 342 public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception { 343 // Prepare the arguments required for the test. 344 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 345 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 346 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 347 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 348 exception.expect(TableNotFoundException.class); 349 doMROnTableTest(null, 1); 350 } 351 352 @Test 353 public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { 354 // Prepare the arguments required for the test. 355 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 356 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 357 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 358 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); 359 doMROnTableTest(null, 1); 360 // Verify temporary table was deleted. 361 exception.expect(TableNotFoundException.class); 362 util.deleteTable(tn); 363 } 364 365 /** 366 * If there are invalid data rows as inputs, then only those rows should be ignored. 367 */ 368 @Test 369 public void testTsvImporterTextMapperWithInvalidData() throws Exception { 370 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 371 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 372 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 373 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 374 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 375 // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS 376 String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; 377 doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); 378 util.deleteTable(tn); 379 } 380 381 @Test 382 public void testSkipEmptyColumns() throws Exception { 383 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 384 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 385 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 386 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 387 args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); 388 // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 389 String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; 390 doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); 391 util.deleteTable(tn); 392 } 393 394 private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { 395 return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1); 396 } 397 398 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family, 399 String data, Map<String, String> args) throws Exception { 400 return doMROnTableTest(util, table, family, data, args, 1, -1); 401 } 402 403 /** 404 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 405 * <code>Tool</code> instance so that other tests can inspect it for further validation as 406 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. 407 * @param args Any arguments to pass BEFORE inputFile path is appended. 408 * @return The Tool instance used to run the test. 409 */ 410 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family, 411 String data, Map<String, String> args, int valueMultiplier, int expectedKVCount) 412 throws Exception { 413 Configuration conf = new Configuration(util.getConfiguration()); 414 415 // populate input file 416 FileSystem fs = FileSystem.get(conf); 417 Path inputPath = 418 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 419 FSDataOutputStream op = fs.create(inputPath, true); 420 if (data == null) { 421 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 422 } 423 op.write(Bytes.toBytes(data)); 424 op.close(); 425 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 426 427 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 428 LOG.debug("Forcing combiner."); 429 conf.setInt("mapreduce.map.combine.minspills", 1); 430 } 431 432 // Build args array. 433 String[] argsArray = new String[args.size() + 2]; 434 Iterator it = args.entrySet().iterator(); 435 int i = 0; 436 while (it.hasNext()) { 437 Map.Entry pair = (Map.Entry) it.next(); 438 argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); 439 i++; 440 } 441 argsArray[i] = table.getNameAsString(); 442 argsArray[i + 1] = inputPath.toString(); 443 444 // run the import 445 Tool tool = new ImportTsv(); 446 LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray)); 447 assertEquals(0, ToolRunner.run(conf, tool, argsArray)); 448 449 // Perform basic validation. If the input args did not include 450 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 451 // Otherwise, validate presence of hfiles. 452 boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) 453 && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); 454 if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 455 if (isDryRun) { 456 assertFalse(String.format("Dry run mode, %s should not have been created.", 457 ImportTsv.BULK_OUTPUT_CONF_KEY), fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); 458 } else { 459 validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount); 460 } 461 } else { 462 validateTable(conf, table, family, valueMultiplier, isDryRun); 463 } 464 465 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 466 LOG.debug("Deleting test subdirectory"); 467 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 468 } 469 return tool; 470 } 471 472 /** 473 * Confirm ImportTsv via data in online table. 474 */ 475 private static void validateTable(Configuration conf, TableName tableName, String family, 476 int valueMultiplier, boolean isDryRun) throws IOException { 477 478 LOG.debug("Validating table."); 479 Connection connection = ConnectionFactory.createConnection(conf); 480 Table table = connection.getTable(tableName); 481 boolean verified = false; 482 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 483 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 484 for (int i = 0; i < numRetries; i++) { 485 try { 486 Scan scan = new Scan(); 487 // Scan entire family. 488 scan.addFamily(Bytes.toBytes(family)); 489 ResultScanner resScanner = table.getScanner(scan); 490 int numRows = 0; 491 for (Result res : resScanner) { 492 numRows++; 493 assertEquals(2, res.size()); 494 List<Cell> kvs = res.listCells(); 495 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 496 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 497 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 498 assertTrue( 499 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 500 // Only one result set is expected, so let it loop. 501 } 502 if (isDryRun) { 503 assertEquals(0, numRows); 504 } else { 505 assertEquals(1, numRows); 506 } 507 verified = true; 508 break; 509 } catch (NullPointerException e) { 510 // If here, a cell was empty. Presume its because updates came in 511 // after the scanner had been opened. Wait a while and retry. 512 } 513 try { 514 Thread.sleep(pause); 515 } catch (InterruptedException e) { 516 // continue 517 } 518 } 519 table.close(); 520 connection.close(); 521 assertTrue(verified); 522 } 523 524 /** 525 * Confirm ImportTsv via HFiles on fs. 526 */ 527 private static void validateHFiles(FileSystem fs, String outputPath, String family, 528 int expectedKVCount) throws IOException { 529 // validate number and content of output columns 530 LOG.debug("Validating HFiles."); 531 Set<String> configFamilies = new HashSet<>(); 532 configFamilies.add(family); 533 Set<String> foundFamilies = new HashSet<>(); 534 int actualKVCount = 0; 535 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 536 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 537 String cf = elements[elements.length - 1]; 538 foundFamilies.add(cf); 539 assertTrue(String.format( 540 "HFile output contains a column family (%s) not present in input families (%s)", cf, 541 configFamilies), configFamilies.contains(cf)); 542 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 543 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 544 hfile.getLen() > 0); 545 // count the number of KVs from all the hfiles 546 if (expectedKVCount > -1) { 547 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 548 } 549 } 550 } 551 assertTrue(String.format("HFile output does not contain the input family '%s'.", family), 552 foundFamilies.contains(family)); 553 if (expectedKVCount > -1) { 554 assertTrue( 555 String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", 556 actualKVCount, expectedKVCount), 557 actualKVCount == expectedKVCount); 558 } 559 } 560 561 /** 562 * Method returns the total KVs in given hfile 563 * @param fs File System 564 * @param p HFile path 565 * @return KV count in the given hfile 566 */ 567 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 568 Configuration conf = util.getConfiguration(); 569 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 570 HFileScanner scanner = reader.getScanner(conf, false, false); 571 scanner.seekTo(); 572 int count = 0; 573 do { 574 count++; 575 } while (scanner.next()); 576 reader.close(); 577 return count; 578 } 579}