001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.IntegrationTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.BufferedMutator; 037import org.apache.hadoop.hbase.client.BufferedMutatorParams; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.log.HBaseMarkers; 049import org.apache.hadoop.hbase.mapreduce.Import; 050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.security.access.AccessControlClient; 053import org.apache.hadoop.hbase.security.access.Permission; 054import org.apache.hadoop.hbase.security.visibility.Authorizations; 055import org.apache.hadoop.hbase.security.visibility.CellVisibility; 056import org.apache.hadoop.hbase.security.visibility.VisibilityClient; 057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 058import org.apache.hadoop.hbase.testclassification.IntegrationTests; 059import org.apache.hadoop.hbase.util.AbstractHBaseTool; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.mapreduce.Counter; 063import org.apache.hadoop.mapreduce.CounterGroup; 064import org.apache.hadoop.mapreduce.Counters; 065import org.apache.hadoop.mapreduce.Job; 066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 067import org.apache.hadoop.util.Tool; 068import org.apache.hadoop.util.ToolRunner; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 075import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 076 077/** 078 * IT test used to verify the deletes with visibility labels. The test creates three tables 079 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of 080 * labels. Another common table with the name 'commontable' is created and it has the data combined 081 * from all these 3 tables such that there are 3 versions of every row but the visibility label in 082 * every row corresponds to the table from which the row originated. Then deletes are issued to the 083 * common table by selecting the visibility label associated with each of the smaller tables. After 084 * the delete is issued with one set of visibility labels we try to scan the common table with each 085 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan 086 * with the first set of visibility labels would return zero result whereas the scan issued with the 087 * other two sets of visibility labels should return all the rows corresponding to that set of 088 * visibility labels. The above process of delete and scan is repeated until after the last set of 089 * visibility labels are used for the deletes the common table should not return any row. To use 090 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 091 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r 092 * .*IntegrationTestBigLinkedListWithVisibility.* 093 */ 094@Category(IntegrationTests.class) 095public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList { 096 097 private static final String CONFIDENTIAL = "confidential"; 098 private static final String TOPSECRET = "topsecret"; 099 private static final String SECRET = "secret"; 100 private static final String PUBLIC = "public"; 101 private static final String PRIVATE = "private"; 102 private static final String EVERYONE = "everyone"; 103 private static final String RESTRICTED = "restricted"; 104 private static final String GROUP = "group"; 105 private static final String PREVILIGED = "previliged"; 106 private static final String OPEN = "open"; 107 public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED 108 + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE; 109 private static final String COMMA = ","; 110 private static final String UNDER_SCORE = "_"; 111 public static int DEFAULT_TABLES_COUNT = 3; 112 public static String tableName = "tableName"; 113 public static final String COMMON_TABLE_NAME = "commontable"; 114 public static final String LABELS_KEY = "LABELS"; 115 public static final String INDEX_KEY = "INDEX"; 116 private static User USER; 117 private static final String OR = "|"; 118 private static String USER_OPT = "user"; 119 private static String userName = "user1"; 120 121 static class VisibilityGenerator extends Generator { 122 private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class); 123 124 @Override 125 protected void createSchema() throws IOException { 126 LOG.info("Creating tables"); 127 // Create three tables 128 boolean acl = AccessControlClient 129 .isAccessControllerRunning(ConnectionFactory.createConnection(getConf())); 130 if (!acl) { 131 LOG.info("No ACL available."); 132 } 133 try (Connection conn = ConnectionFactory.createConnection(getConf()); 134 Admin admin = conn.getAdmin()) { 135 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 136 TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i); 137 createTable(admin, tableName, false, acl); 138 } 139 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 140 createTable(admin, tableName, true, acl); 141 } 142 } 143 144 private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl) 145 throws IOException { 146 if (!admin.tableExists(tableName)) { 147 ColumnFamilyDescriptorBuilder cfBuilder = 148 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME); 149 if (setVersion) { 150 cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT); 151 } 152 TableDescriptor tableDescriptor = 153 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build(); 154 admin.createTable(tableDescriptor); 155 if (acl) { 156 LOG.info("Granting permissions for user " + USER.getShortName()); 157 Permission.Action[] actions = { Permission.Action.READ }; 158 try { 159 AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName, 160 USER.getShortName(), null, null, actions); 161 } catch (Throwable e) { 162 LOG.error(HBaseMarkers.FATAL, 163 "Error in granting permission for the user " + USER.getShortName(), e); 164 throw new IOException(e); 165 } 166 } 167 } 168 } 169 170 @Override 171 protected void setMapperForGenerator(Job job) { 172 job.setMapperClass(VisibilityGeneratorMapper.class); 173 } 174 175 static class VisibilityGeneratorMapper extends GeneratorMapper { 176 BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT]; 177 178 @Override 179 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 180 throws IOException, InterruptedException { 181 super.setup(context); 182 } 183 184 @Override 185 protected void instantiateHTable() throws IOException { 186 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 187 BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i)); 188 params.writeBufferSize(4 * 1024 * 1024); 189 BufferedMutator table = connection.getBufferedMutator(params); 190 this.tables[i] = table; 191 } 192 } 193 194 @Override 195 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) 196 throws IOException, InterruptedException { 197 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 198 if (tables[i] != null) { 199 tables[i].close(); 200 } 201 } 202 } 203 204 @Override 205 protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, 206 byte[][] prev, byte[][] current, byte[] id) throws IOException { 207 String visibilityExps = ""; 208 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 209 for (int i = 0; i < current.length; i++) { 210 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 211 Put put = new Put(current[i]); 212 byte[] value = prev == null ? NO_KEY : prev[i]; 213 put.addColumn(FAMILY_NAME, COLUMN_PREV, value); 214 215 if (count >= 0) { 216 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 217 } 218 if (id != null) { 219 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 220 } 221 visibilityExps = split[j * 2] + OR + split[(j * 2) + 1]; 222 put.setCellVisibility(new CellVisibility(visibilityExps)); 223 tables[j].mutate(put); 224 try { 225 Thread.sleep(1); 226 } catch (InterruptedException e) { 227 throw new IOException(); 228 } 229 } 230 if (i % 1000 == 0) { 231 // Tickle progress every so often else maprunner will think us hung 232 output.progress(); 233 } 234 } 235 } 236 } 237 } 238 239 static class Copier extends Configured implements Tool { 240 private static final Logger LOG = LoggerFactory.getLogger(Copier.class); 241 private TableName tableName; 242 private int labelIndex; 243 private boolean delete; 244 245 public Copier(TableName tableName, int index, boolean delete) { 246 this.tableName = tableName; 247 this.labelIndex = index; 248 this.delete = delete; 249 } 250 251 public int runCopier(String outputDir) throws Exception { 252 Job job = new Job(getConf()); 253 job.setJobName("Data copier"); 254 job.getConfiguration().setInt("INDEX", labelIndex); 255 job.getConfiguration().set("LABELS", labels); 256 job.setJarByClass(getClass()); 257 Scan scan = new Scan(); 258 scan.setCacheBlocks(false); 259 scan.setRaw(true); 260 261 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 262 scan.setAuthorizations( 263 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 264 if (delete) { 265 LOG.info("Running deletes"); 266 } else { 267 LOG.info("Running copiers"); 268 } 269 if (delete) { 270 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 271 VisibilityDeleteImport.class, null, null, job); 272 } else { 273 TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, 274 VisibilityImport.class, null, null, job); 275 } 276 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 277 job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false); 278 TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job); 279 TableMapReduceUtil.addDependencyJars(job); 280 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 281 AbstractHBaseTool.class); 282 TableMapReduceUtil.initCredentials(job); 283 job.setNumReduceTasks(0); 284 boolean success = job.waitForCompletion(true); 285 return success ? 0 : 1; 286 } 287 288 @Override 289 public int run(String[] arg0) throws Exception { 290 // TODO Auto-generated method stub 291 return 0; 292 } 293 } 294 295 static class VisibilityImport extends Import.Importer { 296 private int index; 297 private String labels; 298 private String[] split; 299 300 @Override 301 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 302 index = context.getConfiguration().getInt(INDEX_KEY, -1); 303 labels = context.getConfiguration().get(LABELS_KEY); 304 split = labels.split(COMMA); 305 super.setup(context); 306 } 307 308 @Override 309 protected void addPutToKv(Put put, Cell kv) throws IOException { 310 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 311 put.setCellVisibility(new CellVisibility(visibilityExps)); 312 super.addPutToKv(put, kv); 313 } 314 } 315 316 static class VisibilityDeleteImport extends Import.Importer { 317 private int index; 318 private String labels; 319 private String[] split; 320 321 @Override 322 public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) { 323 index = context.getConfiguration().getInt(INDEX_KEY, -1); 324 labels = context.getConfiguration().get(LABELS_KEY); 325 split = labels.split(COMMA); 326 super.setup(context); 327 } 328 329 // Creating delete here 330 @Override 331 protected void processKV(ImmutableBytesWritable key, Result result, 332 org.apache.hadoop.mapreduce.Mapper.Context context, Put put, 333 org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException { 334 String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1]; 335 for (Cell kv : result.rawCells()) { 336 // skip if we filter it out 337 if (kv == null) continue; 338 // Create deletes here 339 if (delete == null) { 340 delete = new Delete(key.get()); 341 } 342 delete.setCellVisibility(new CellVisibility(visibilityExps)); 343 delete.addFamily(CellUtil.cloneFamily(kv)); 344 } 345 if (delete != null) { 346 context.write(key, delete); 347 } 348 } 349 } 350 351 @Override 352 protected void addOptions() { 353 super.addOptions(); 354 addOptWithArg("u", USER_OPT, "User name"); 355 } 356 357 @Override 358 protected void processOptions(CommandLine cmd) { 359 super.processOptions(cmd); 360 if (cmd.hasOption(USER_OPT)) { 361 userName = cmd.getOptionValue(USER_OPT); 362 } 363 364 } 365 366 @Override 367 public void setUpCluster() throws Exception { 368 util = getTestingUtil(null); 369 Configuration conf = util.getConfiguration(); 370 VisibilityTestUtil.enableVisiblityLabels(conf); 371 conf.set("hbase.superuser", User.getCurrent().getName()); 372 conf.setBoolean("dfs.permissions", false); 373 USER = User.createUserForTesting(conf, userName, new String[] {}); 374 super.setUpCluster(); 375 addLabels(); 376 } 377 378 static TableName getTableName(int i) { 379 return TableName.valueOf(tableName + UNDER_SCORE + i); 380 } 381 382 private void addLabels() throws Exception { 383 try { 384 VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA)); 385 VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName()); 386 } catch (Throwable t) { 387 throw new IOException(t); 388 } 389 } 390 391 static class VisibilityVerify extends Verify { 392 private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class); 393 private TableName tableName; 394 private int labelIndex; 395 396 public VisibilityVerify(String tableName, int index) { 397 this.tableName = TableName.valueOf(tableName); 398 this.labelIndex = index; 399 } 400 401 @Override 402 public int run(final Path outputDir, final int numReducers) throws Exception { 403 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 404 PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() { 405 @Override 406 public Integer run() throws Exception { 407 return doVerify(outputDir, numReducers); 408 } 409 }; 410 return USER.runAs(scanAction); 411 } 412 413 private int doVerify(Path outputDir, int numReducers) 414 throws IOException, InterruptedException, ClassNotFoundException { 415 job = new Job(getConf()); 416 417 job.setJobName("Link Verifier"); 418 job.setNumReduceTasks(numReducers); 419 job.setJarByClass(getClass()); 420 421 setJobScannerConf(job); 422 423 Scan scan = new Scan(); 424 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 425 scan.setCaching(10000); 426 scan.setCacheBlocks(false); 427 String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new); 428 429 scan.setAuthorizations( 430 new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); 431 432 TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class, 433 BytesWritable.class, BytesWritable.class, job); 434 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 435 AbstractHBaseTool.class); 436 437 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 438 439 job.setReducerClass(VerifyReducer.class); 440 job.setOutputFormatClass(TextOutputFormat.class); 441 TextOutputFormat.setOutputPath(job, outputDir); 442 boolean success = job.waitForCompletion(true); 443 444 return success ? 0 : 1; 445 } 446 447 @Override 448 protected void handleFailure(Counters counters) throws IOException { 449 try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) { 450 TableName tableName = TableName.valueOf(COMMON_TABLE_NAME); 451 CounterGroup g = counters.getGroup("undef"); 452 Iterator<Counter> it = g.iterator(); 453 while (it.hasNext()) { 454 String keyString = it.next().getName(); 455 byte[] key = Bytes.toBytes(keyString); 456 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 457 LOG.error("undefined row " + keyString + ", " + loc); 458 } 459 g = counters.getGroup("unref"); 460 it = g.iterator(); 461 while (it.hasNext()) { 462 String keyString = it.next().getName(); 463 byte[] key = Bytes.toBytes(keyString); 464 HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true); 465 LOG.error("unreferred row " + keyString + ", " + loc); 466 } 467 } 468 } 469 } 470 471 static class VisibilityLoop extends Loop { 472 private static final int SLEEP_IN_MS = 5000; 473 private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); 474 475 @Override 476 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 477 Integer wrapMultiplier, Integer numWalkers) throws Exception { 478 Path outputPath = new Path(outputDir); 479 UUID uuid = UUID.randomUUID(); // create a random UUID. 480 Path generatorOutput = new Path(outputPath, uuid.toString()); 481 482 Generator generator = new VisibilityGenerator(); 483 generator.setConf(getConf()); 484 int retCode = 485 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 486 if (retCode > 0) { 487 throw new RuntimeException("Generator failed with return code: " + retCode); 488 } 489 } 490 491 protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, 492 Integer wrapMultiplier, int tableIndex) throws Exception { 493 LOG.info("Running copier on table " 494 + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); 495 Copier copier = new Copier( 496 IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); 497 copier.setConf(getConf()); 498 copier.runCopier(outputDir); 499 Thread.sleep(SLEEP_IN_MS); 500 } 501 502 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, 503 boolean allTables) throws Exception { 504 Path outputPath = new Path(outputDir); 505 506 if (allTables) { 507 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 508 LOG.info("Verifying table " + i); 509 sleep(SLEEP_IN_MS); 510 UUID uuid = UUID.randomUUID(); // create a random UUID. 511 Path iterationOutput = new Path(outputPath, uuid.toString()); 512 Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i); 513 verify(numReducers, expectedNumNodes, iterationOutput, verify); 514 } 515 } 516 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 517 runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i); 518 } 519 } 520 521 private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex) 522 throws Exception { 523 long temp = expectedNodes; 524 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 525 if (i <= tableIndex) { 526 expectedNodes = 0; 527 } else { 528 expectedNodes = temp; 529 } 530 LOG.info("Verifying data in the table with index " + i + " and expected nodes is " 531 + expectedNodes); 532 runVerifyCommonTable(outputDir, numReducers, expectedNodes, i); 533 } 534 } 535 536 private void sleep(long ms) throws InterruptedException { 537 Thread.sleep(ms); 538 } 539 540 protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes, 541 int index) throws Exception { 542 LOG.info("Verifying common table with index " + index); 543 sleep(SLEEP_IN_MS); 544 Path outputPath = new Path(outputDir); 545 UUID uuid = UUID.randomUUID(); // create a random UUID. 546 Path iterationOutput = new Path(outputPath, uuid.toString()); 547 Verify verify = 548 new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index); 549 verify(numReducers, expectedNumNodes, iterationOutput, verify); 550 } 551 552 protected void runCopier(String outputDir) throws Exception { 553 for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { 554 LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i)); 555 sleep(SLEEP_IN_MS); 556 Copier copier = 557 new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false); 558 copier.setConf(getConf()); 559 copier.runCopier(outputDir); 560 } 561 } 562 563 private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify) 564 throws Exception { 565 verify.setConf(getConf()); 566 int retCode = verify.run(iterationOutput, numReducers); 567 if (retCode > 0) { 568 throw new RuntimeException("Verify.run failed with return code: " + retCode); 569 } 570 571 if (!verify.verify(expectedNumNodes)) { 572 throw new RuntimeException("Verify.verify failed"); 573 } 574 575 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); 576 } 577 578 @Override 579 public int run(String[] args) throws Exception { 580 if (args.length < 5) { 581 System.err.println( 582 "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> " 583 + "<num reducers> [<width> <wrap multiplier>]"); 584 return 1; 585 } 586 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 587 588 int numIterations = Integer.parseInt(args[0]); 589 int numMappers = Integer.parseInt(args[1]); 590 long numNodes = Long.parseLong(args[2]); 591 String outputDir = args[3]; 592 int numReducers = Integer.parseInt(args[4]); 593 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 594 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 595 long expectedNumNodes = 0; 596 597 if (numIterations < 0) { 598 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 599 } 600 601 for (int i = 0; i < numIterations; i++) { 602 LOG.info("Starting iteration = " + i); 603 LOG.info("Generating data"); 604 // By default run no concurrent walkers for test with visibility 605 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); 606 expectedNumNodes += numMappers * numNodes; 607 // Copying wont work because expressions are not returned back to the 608 // client 609 LOG.info("Running copier"); 610 sleep(SLEEP_IN_MS); 611 runCopier(outputDir); 612 LOG.info("Verifying copied data"); 613 sleep(SLEEP_IN_MS); 614 runVerify(outputDir, numReducers, expectedNumNodes, true); 615 sleep(SLEEP_IN_MS); 616 for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { 617 LOG.info("Deleting data on table with index: " + j); 618 runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); 619 sleep(SLEEP_IN_MS); 620 LOG.info("Verifying common table after deleting"); 621 runVerify(outputDir, numReducers, expectedNumNodes, j); 622 sleep(SLEEP_IN_MS); 623 } 624 } 625 return 0; 626 } 627 } 628 629 @Override 630 @Test 631 public void testContinuousIngest() throws IOException, Exception { 632 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> 633 // <num reducers> 634 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(), 635 new String[] { "1", "1", "20000", 636 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1", 637 "10000" }); 638 org.junit.Assert.assertEquals(0, ret); 639 } 640 641 public static void main(String[] args) throws Exception { 642 Configuration conf = HBaseConfiguration.create(); 643 IntegrationTestingUtility.setUseDistributedCluster(conf); 644 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args); 645 System.exit(ret); 646 } 647 648 @Override 649 protected MonkeyFactory getDefaultMonkeyFactory() { 650 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 651 } 652 653 @Override 654 public int runTestFromCommandLine() throws Exception { 655 return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs); 656 } 657}