001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.io.hfile.HFileScanner; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.security.visibility.Authorizations; 054import org.apache.hadoop.hbase.security.visibility.CellVisibility; 055import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; 056import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; 057import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 058import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; 059import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 060import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.testclassification.MapReduceTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 065import org.apache.hadoop.util.Tool; 066import org.apache.hadoop.util.ToolRunner; 067import org.junit.AfterClass; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; 078 079@Category({ MapReduceTests.class, LargeTests.class }) 080public class TestImportTSVWithVisibilityLabels implements Configurable { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestImportTSVWithVisibilityLabels.class); 085 086 private static final Logger LOG = 087 LoggerFactory.getLogger(TestImportTSVWithVisibilityLabels.class); 088 protected static final String NAME = TestImportTsv.class.getSimpleName(); 089 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 090 091 /** 092 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 093 */ 094 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 095 096 /** 097 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 098 */ 099 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 100 101 private final String FAMILY = "FAM"; 102 private final static String TOPSECRET = "topsecret"; 103 private final static String PUBLIC = "public"; 104 private final static String PRIVATE = "private"; 105 private final static String CONFIDENTIAL = "confidential"; 106 private final static String SECRET = "secret"; 107 private static User SUPERUSER; 108 private static Configuration conf; 109 110 @Rule 111 public TestName name = new TestName(); 112 113 @Override 114 public Configuration getConf() { 115 return util.getConfiguration(); 116 } 117 118 @Override 119 public void setConf(Configuration conf) { 120 throw new IllegalArgumentException("setConf not supported"); 121 } 122 123 @BeforeClass 124 public static void provisionCluster() throws Exception { 125 conf = util.getConfiguration(); 126 SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); 127 conf.set("hbase.superuser", "admin," + User.getCurrent().getName()); 128 VisibilityTestUtil.enableVisiblityLabels(conf); 129 conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, 130 ScanLabelGenerator.class); 131 util.startMiniCluster(); 132 // Wait for the labels table to become available 133 util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); 134 createLabels(); 135 } 136 137 private static void createLabels() throws IOException, InterruptedException { 138 PrivilegedExceptionAction<VisibilityLabelsResponse> action = 139 new PrivilegedExceptionAction<VisibilityLabelsResponse>() { 140 @Override 141 public VisibilityLabelsResponse run() throws Exception { 142 String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; 143 try (Connection conn = ConnectionFactory.createConnection(conf)) { 144 VisibilityClient.addLabels(conn, labels); 145 LOG.info("Added labels "); 146 } catch (Throwable t) { 147 LOG.error("Error in adding labels", t); 148 throw new IOException(t); 149 } 150 return null; 151 } 152 }; 153 SUPERUSER.runAs(action); 154 } 155 156 @AfterClass 157 public static void releaseCluster() throws Exception { 158 util.shutdownMiniCluster(); 159 } 160 161 @Test 162 public void testMROnTable() throws Exception { 163 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 164 165 // Prepare the arguments required for the test. 166 String[] args = new String[] { 167 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 168 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 169 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 170 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 171 util.createTable(tableName, FAMILY); 172 doMROnTableTest(util, FAMILY, data, args, 1); 173 util.deleteTable(tableName); 174 } 175 176 @Test 177 public void testMROnTableWithDeletes() throws Exception { 178 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 179 180 // Prepare the arguments required for the test. 181 String[] args = new String[] { 182 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 183 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 184 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 185 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 186 util.createTable(tableName, FAMILY); 187 doMROnTableTest(util, FAMILY, data, args, 1); 188 issueDeleteAndVerifyData(tableName); 189 util.deleteTable(tableName); 190 } 191 192 private void issueDeleteAndVerifyData(TableName tableName) throws IOException { 193 LOG.debug("Validating table after delete."); 194 Table table = util.getConnection().getTable(tableName); 195 boolean verified = false; 196 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 197 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 198 for (int i = 0; i < numRetries; i++) { 199 try { 200 Delete d = new Delete(Bytes.toBytes("KEY")); 201 d.addFamily(Bytes.toBytes(FAMILY)); 202 d.setCellVisibility(new CellVisibility("private&secret")); 203 table.delete(d); 204 205 Scan scan = new Scan(); 206 // Scan entire family. 207 scan.addFamily(Bytes.toBytes(FAMILY)); 208 scan.setAuthorizations(new Authorizations("secret", "private")); 209 ResultScanner resScanner = table.getScanner(scan); 210 Result[] next = resScanner.next(5); 211 assertEquals(0, next.length); 212 verified = true; 213 break; 214 } catch (NullPointerException e) { 215 // If here, a cell was empty. Presume its because updates came in 216 // after the scanner had been opened. Wait a while and retry. 217 } 218 try { 219 Thread.sleep(pause); 220 } catch (InterruptedException e) { 221 // continue 222 } 223 } 224 table.close(); 225 assertTrue(verified); 226 } 227 228 @Test 229 public void testMROnTableWithBulkload() throws Exception { 230 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 231 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 232 // Prepare the arguments required for the test. 233 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 234 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 235 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 236 String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; 237 util.createTable(tableName, FAMILY); 238 doMROnTableTest(util, FAMILY, data, args, 1); 239 util.deleteTable(tableName); 240 } 241 242 @Test 243 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 244 final TableName table = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 245 String FAMILY = "FAM"; 246 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 247 // Prepare the arguments required for the test. 248 String[] args = new String[] { 249 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 250 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 251 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 252 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 253 table.getNameAsString() }; 254 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 255 doMROnTableTest(util, FAMILY, data, args, 4); 256 util.deleteTable(table); 257 } 258 259 @Test 260 public void testMRWithOutputFormat() throws Exception { 261 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 262 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 263 // Prepare the arguments required for the test. 264 String[] args = new String[] { 265 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 266 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 267 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 268 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 269 String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; 270 util.createTable(tableName, FAMILY); 271 doMROnTableTest(util, FAMILY, data, args, 1); 272 util.deleteTable(tableName); 273 } 274 275 @Test 276 public void testBulkOutputWithInvalidLabels() throws Exception { 277 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 278 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 279 // Prepare the arguments required for the test. 280 String[] args = new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 281 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 282 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 283 284 // 2 Data rows, one with valid label and one with invalid label 285 String data = 286 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 287 util.createTable(tableName, FAMILY); 288 doMROnTableTest(util, FAMILY, data, args, 1, 2); 289 util.deleteTable(tableName); 290 } 291 292 @Test 293 public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { 294 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 295 Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); 296 // Prepare the arguments required for the test. 297 String[] args = new String[] { 298 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 299 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), 300 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", 301 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 302 303 // 2 Data rows, one with valid label and one with invalid label 304 String data = 305 "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; 306 util.createTable(tableName, FAMILY); 307 doMROnTableTest(util, FAMILY, data, args, 1, 2); 308 util.deleteTable(tableName); 309 } 310 311 protected static Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, 312 String[] args, int valueMultiplier) throws Exception { 313 return doMROnTableTest(util, family, data, args, valueMultiplier, -1); 314 } 315 316 /** 317 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 318 * <code>Tool</code> instance so that other tests can inspect it for further validation as 319 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. Any 320 * arguments to pass BEFORE inputFile path is appended. 321 * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check 322 * @return The Tool instance used to run the test. 323 */ 324 protected static Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, 325 String[] args, int valueMultiplier, int expectedKVCount) throws Exception { 326 TableName table = TableName.valueOf(args[args.length - 1]); 327 Configuration conf = new Configuration(util.getConfiguration()); 328 329 // populate input file 330 FileSystem fs = FileSystem.get(conf); 331 Path inputPath = 332 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 333 FSDataOutputStream op = fs.create(inputPath, true); 334 if (data == null) { 335 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 336 } 337 op.write(Bytes.toBytes(data)); 338 op.close(); 339 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 340 341 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 342 LOG.debug("Forcing combiner."); 343 conf.setInt("mapreduce.map.combine.minspills", 1); 344 } 345 346 // run the import 347 List<String> argv = new ArrayList<>(Arrays.asList(args)); 348 argv.add(inputPath.toString()); 349 Tool tool = new ImportTsv(); 350 LOG.debug("Running ImportTsv with arguments: " + argv); 351 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 352 353 // Perform basic validation. If the input args did not include 354 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 355 // Otherwise, validate presence of hfiles. 356 boolean createdHFiles = false; 357 String outputPath = null; 358 for (String arg : argv) { 359 if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 360 createdHFiles = true; 361 // split '-Dfoo=bar' on '=' and keep 'bar' 362 outputPath = arg.split("=")[1]; 363 break; 364 } 365 } 366 LOG.debug("validating the table " + createdHFiles); 367 if (createdHFiles) validateHFiles(fs, outputPath, family, expectedKVCount); 368 else validateTable(conf, table, family, valueMultiplier); 369 370 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 371 LOG.debug("Deleting test subdirectory"); 372 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 373 } 374 return tool; 375 } 376 377 /** 378 * Confirm ImportTsv via HFiles on fs. 379 */ 380 private static void validateHFiles(FileSystem fs, String outputPath, String family, 381 int expectedKVCount) throws IOException { 382 383 // validate number and content of output columns 384 LOG.debug("Validating HFiles."); 385 Set<String> configFamilies = new HashSet<>(); 386 configFamilies.add(family); 387 Set<String> foundFamilies = new HashSet<>(); 388 int actualKVCount = 0; 389 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 390 LOG.debug("The output path has files"); 391 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 392 String cf = elements[elements.length - 1]; 393 foundFamilies.add(cf); 394 assertTrue(String.format( 395 "HFile ouput contains a column family (%s) not present in input families (%s)", cf, 396 configFamilies), configFamilies.contains(cf)); 397 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 398 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 399 hfile.getLen() > 0); 400 if (expectedKVCount > -1) { 401 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 402 } 403 } 404 } 405 if (expectedKVCount > -1) { 406 assertTrue( 407 String.format("KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", 408 actualKVCount, expectedKVCount), 409 actualKVCount == expectedKVCount); 410 } 411 } 412 413 /** 414 * Confirm ImportTsv via data in online table. 415 */ 416 private static void validateTable(Configuration conf, TableName tableName, String family, 417 int valueMultiplier) throws IOException { 418 419 LOG.debug("Validating table."); 420 Table table = util.getConnection().getTable(tableName); 421 boolean verified = false; 422 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 423 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 424 for (int i = 0; i < numRetries; i++) { 425 try { 426 Scan scan = new Scan(); 427 // Scan entire family. 428 scan.addFamily(Bytes.toBytes(family)); 429 scan.setAuthorizations(new Authorizations("secret", "private")); 430 ResultScanner resScanner = table.getScanner(scan); 431 Result[] next = resScanner.next(5); 432 assertEquals(1, next.length); 433 for (Result res : resScanner) { 434 LOG.debug("Getting results " + res.size()); 435 assertTrue(res.size() == 2); 436 List<Cell> kvs = res.listCells(); 437 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 438 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 439 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 440 assertTrue( 441 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 442 // Only one result set is expected, so let it loop. 443 } 444 verified = true; 445 break; 446 } catch (NullPointerException e) { 447 // If here, a cell was empty. Presume its because updates came in 448 // after the scanner had been opened. Wait a while and retry. 449 } 450 try { 451 Thread.sleep(pause); 452 } catch (InterruptedException e) { 453 // continue 454 } 455 } 456 table.close(); 457 assertTrue(verified); 458 } 459 460 /** 461 * Method returns the total KVs in given hfile 462 * @param fs File System 463 * @param p HFile path 464 * @return KV count in the given hfile 465 */ 466 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 467 Configuration conf = util.getConfiguration(); 468 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 469 HFileScanner scanner = reader.getScanner(conf, false, false); 470 scanner.seekTo(); 471 int count = 0; 472 do { 473 count++; 474 } while (scanner.next()); 475 reader.close(); 476 return count; 477 } 478 479}