001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.IntegrationTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.BufferedMutator;
037import org.apache.hadoop.hbase.client.BufferedMutatorParams;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.mapreduce.Import;
050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessControlClient;
053import org.apache.hadoop.hbase.security.access.Permission;
054import org.apache.hadoop.hbase.security.visibility.Authorizations;
055import org.apache.hadoop.hbase.security.visibility.CellVisibility;
056import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
057import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
058import org.apache.hadoop.hbase.testclassification.IntegrationTests;
059import org.apache.hadoop.hbase.util.AbstractHBaseTool;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.mapreduce.Counter;
063import org.apache.hadoop.mapreduce.CounterGroup;
064import org.apache.hadoop.mapreduce.Counters;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
067import org.apache.hadoop.util.Tool;
068import org.apache.hadoop.util.ToolRunner;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
075import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
076
077/**
078 * IT test used to verify the deletes with visibility labels. The test creates three tables
079 * tablename_0, tablename_1 and tablename_2 and each table is associated with a unique pair of
080 * labels. Another common table with the name 'commontable' is created and it has the data combined
081 * from all these 3 tables such that there are 3 versions of every row but the visibility label in
082 * every row corresponds to the table from which the row originated. Then deletes are issued to the
083 * common table by selecting the visibility label associated with each of the smaller tables. After
084 * the delete is issued with one set of visibility labels we try to scan the common table with each
085 * of the visibility pairs defined for the 3 tables. So after the first delete is issued, a scan
086 * with the first set of visibility labels would return zero result whereas the scan issued with the
087 * other two sets of visibility labels should return all the rows corresponding to that set of
088 * visibility labels. The above process of delete and scan is repeated until after the last set of
089 * visibility labels are used for the deletes the common table should not return any row. To use
090 * this ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1
091 * 20000 /tmp 1 10000 or ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r
092 * .*IntegrationTestBigLinkedListWithVisibility.*
093 */
094@Category(IntegrationTests.class)
095public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
096
097  private static final String CONFIDENTIAL = "confidential";
098  private static final String TOPSECRET = "topsecret";
099  private static final String SECRET = "secret";
100  private static final String PUBLIC = "public";
101  private static final String PRIVATE = "private";
102  private static final String EVERYONE = "everyone";
103  private static final String RESTRICTED = "restricted";
104  private static final String GROUP = "group";
105  private static final String PREVILIGED = "previliged";
106  private static final String OPEN = "open";
107  public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
108    + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
109  private static final String COMMA = ",";
110  private static final String UNDER_SCORE = "_";
111  public static int DEFAULT_TABLES_COUNT = 3;
112  public static String tableName = "tableName";
113  public static final String COMMON_TABLE_NAME = "commontable";
114  public static final String LABELS_KEY = "LABELS";
115  public static final String INDEX_KEY = "INDEX";
116  private static User USER;
117  private static final String OR = "|";
118  private static String USER_OPT = "user";
119  private static String userName = "user1";
120
121  static class VisibilityGenerator extends Generator {
122    private static final Logger LOG = LoggerFactory.getLogger(VisibilityGenerator.class);
123
124    @Override
125    protected void createSchema() throws IOException {
126      LOG.info("Creating tables");
127      // Create three tables
128      boolean acl = AccessControlClient
129        .isAccessControllerRunning(ConnectionFactory.createConnection(getConf()));
130      if (!acl) {
131        LOG.info("No ACL available.");
132      }
133      try (Connection conn = ConnectionFactory.createConnection(getConf());
134        Admin admin = conn.getAdmin()) {
135        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
136          TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
137          createTable(admin, tableName, false, acl);
138        }
139        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
140        createTable(admin, tableName, true, acl);
141      }
142    }
143
144    private void createTable(Admin admin, TableName tableName, boolean setVersion, boolean acl)
145      throws IOException {
146      if (!admin.tableExists(tableName)) {
147        ColumnFamilyDescriptorBuilder cfBuilder =
148          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME);
149        if (setVersion) {
150          cfBuilder.setMaxVersions(DEFAULT_TABLES_COUNT);
151        }
152        TableDescriptor tableDescriptor =
153          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfBuilder.build()).build();
154        admin.createTable(tableDescriptor);
155        if (acl) {
156          LOG.info("Granting permissions for user " + USER.getShortName());
157          Permission.Action[] actions = { Permission.Action.READ };
158          try {
159            AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
160              USER.getShortName(), null, null, actions);
161          } catch (Throwable e) {
162            LOG.error(HBaseMarkers.FATAL,
163              "Error in granting permission for the user " + USER.getShortName(), e);
164            throw new IOException(e);
165          }
166        }
167      }
168    }
169
170    @Override
171    protected void setMapperForGenerator(Job job) {
172      job.setMapperClass(VisibilityGeneratorMapper.class);
173    }
174
175    static class VisibilityGeneratorMapper extends GeneratorMapper {
176      BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
177
178      @Override
179      protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
180        throws IOException, InterruptedException {
181        super.setup(context);
182      }
183
184      @Override
185      protected void instantiateHTable() throws IOException {
186        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
187          BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
188          params.writeBufferSize(4 * 1024 * 1024);
189          BufferedMutator table = connection.getBufferedMutator(params);
190          this.tables[i] = table;
191        }
192      }
193
194      @Override
195      protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
196        throws IOException, InterruptedException {
197        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
198          if (tables[i] != null) {
199            tables[i].close();
200          }
201        }
202      }
203
204      @Override
205      protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
206        byte[][] prev, byte[][] current, byte[] id) throws IOException {
207        String visibilityExps = "";
208        String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
209        for (int i = 0; i < current.length; i++) {
210          for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
211            Put put = new Put(current[i]);
212            byte[] value = prev == null ? NO_KEY : prev[i];
213            put.addColumn(FAMILY_NAME, COLUMN_PREV, value);
214
215            if (count >= 0) {
216              put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
217            }
218            if (id != null) {
219              put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
220            }
221            visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
222            put.setCellVisibility(new CellVisibility(visibilityExps));
223            tables[j].mutate(put);
224            try {
225              Thread.sleep(1);
226            } catch (InterruptedException e) {
227              throw new IOException();
228            }
229          }
230          if (i % 1000 == 0) {
231            // Tickle progress every so often else maprunner will think us hung
232            output.progress();
233          }
234        }
235      }
236    }
237  }
238
239  static class Copier extends Configured implements Tool {
240    private static final Logger LOG = LoggerFactory.getLogger(Copier.class);
241    private TableName tableName;
242    private int labelIndex;
243    private boolean delete;
244
245    public Copier(TableName tableName, int index, boolean delete) {
246      this.tableName = tableName;
247      this.labelIndex = index;
248      this.delete = delete;
249    }
250
251    public int runCopier(String outputDir) throws Exception {
252      Job job = new Job(getConf());
253      job.setJobName("Data copier");
254      job.getConfiguration().setInt("INDEX", labelIndex);
255      job.getConfiguration().set("LABELS", labels);
256      job.setJarByClass(getClass());
257      Scan scan = new Scan();
258      scan.setCacheBlocks(false);
259      scan.setRaw(true);
260
261      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
262      scan.setAuthorizations(
263        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
264      if (delete) {
265        LOG.info("Running deletes");
266      } else {
267        LOG.info("Running copiers");
268      }
269      if (delete) {
270        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
271          VisibilityDeleteImport.class, null, null, job);
272      } else {
273        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
274          VisibilityImport.class, null, null, job);
275      }
276      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
277      job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
278      TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job);
279      TableMapReduceUtil.addDependencyJars(job);
280      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
281        AbstractHBaseTool.class);
282      TableMapReduceUtil.initCredentials(job);
283      job.setNumReduceTasks(0);
284      boolean success = job.waitForCompletion(true);
285      return success ? 0 : 1;
286    }
287
288    @Override
289    public int run(String[] arg0) throws Exception {
290      // TODO Auto-generated method stub
291      return 0;
292    }
293  }
294
295  static class VisibilityImport extends Import.Importer {
296    private int index;
297    private String labels;
298    private String[] split;
299
300    @Override
301    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
302      index = context.getConfiguration().getInt(INDEX_KEY, -1);
303      labels = context.getConfiguration().get(LABELS_KEY);
304      split = labels.split(COMMA);
305      super.setup(context);
306    }
307
308    @Override
309    protected void addPutToKv(Put put, Cell kv) throws IOException {
310      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
311      put.setCellVisibility(new CellVisibility(visibilityExps));
312      super.addPutToKv(put, kv);
313    }
314  }
315
316  static class VisibilityDeleteImport extends Import.Importer {
317    private int index;
318    private String labels;
319    private String[] split;
320
321    @Override
322    public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
323      index = context.getConfiguration().getInt(INDEX_KEY, -1);
324      labels = context.getConfiguration().get(LABELS_KEY);
325      split = labels.split(COMMA);
326      super.setup(context);
327    }
328
329    // Creating delete here
330    @Override
331    protected void processKV(ImmutableBytesWritable key, Result result,
332      org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
333      org.apache.hadoop.hbase.client.Delete delete) throws IOException, InterruptedException {
334      String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
335      for (Cell kv : result.rawCells()) {
336        // skip if we filter it out
337        if (kv == null) continue;
338        // Create deletes here
339        if (delete == null) {
340          delete = new Delete(key.get());
341        }
342        delete.setCellVisibility(new CellVisibility(visibilityExps));
343        delete.addFamily(CellUtil.cloneFamily(kv));
344      }
345      if (delete != null) {
346        context.write(key, delete);
347      }
348    }
349  }
350
351  @Override
352  protected void addOptions() {
353    super.addOptions();
354    addOptWithArg("u", USER_OPT, "User name");
355  }
356
357  @Override
358  protected void processOptions(CommandLine cmd) {
359    super.processOptions(cmd);
360    if (cmd.hasOption(USER_OPT)) {
361      userName = cmd.getOptionValue(USER_OPT);
362    }
363
364  }
365
366  @Override
367  public void setUpCluster() throws Exception {
368    util = getTestingUtil(null);
369    Configuration conf = util.getConfiguration();
370    VisibilityTestUtil.enableVisiblityLabels(conf);
371    conf.set("hbase.superuser", User.getCurrent().getName());
372    conf.setBoolean("dfs.permissions", false);
373    USER = User.createUserForTesting(conf, userName, new String[] {});
374    super.setUpCluster();
375    addLabels();
376  }
377
378  static TableName getTableName(int i) {
379    return TableName.valueOf(tableName + UNDER_SCORE + i);
380  }
381
382  private void addLabels() throws Exception {
383    try {
384      VisibilityClient.addLabels(util.getConnection(), labels.split(COMMA));
385      VisibilityClient.setAuths(util.getConnection(), labels.split(COMMA), USER.getName());
386    } catch (Throwable t) {
387      throw new IOException(t);
388    }
389  }
390
391  static class VisibilityVerify extends Verify {
392    private static final Logger LOG = LoggerFactory.getLogger(VisibilityVerify.class);
393    private TableName tableName;
394    private int labelIndex;
395
396    public VisibilityVerify(String tableName, int index) {
397      this.tableName = TableName.valueOf(tableName);
398      this.labelIndex = index;
399    }
400
401    @Override
402    public int run(final Path outputDir, final int numReducers) throws Exception {
403      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
404      PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
405        @Override
406        public Integer run() throws Exception {
407          return doVerify(outputDir, numReducers);
408        }
409      };
410      return USER.runAs(scanAction);
411    }
412
413    private int doVerify(Path outputDir, int numReducers)
414      throws IOException, InterruptedException, ClassNotFoundException {
415      job = new Job(getConf());
416
417      job.setJobName("Link Verifier");
418      job.setNumReduceTasks(numReducers);
419      job.setJarByClass(getClass());
420
421      setJobScannerConf(job);
422
423      Scan scan = new Scan();
424      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
425      scan.setCaching(10000);
426      scan.setCacheBlocks(false);
427      String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
428
429      scan.setAuthorizations(
430        new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
431
432      TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
433        BytesWritable.class, BytesWritable.class, job);
434      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
435        AbstractHBaseTool.class);
436
437      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
438
439      job.setReducerClass(VerifyReducer.class);
440      job.setOutputFormatClass(TextOutputFormat.class);
441      TextOutputFormat.setOutputPath(job, outputDir);
442      boolean success = job.waitForCompletion(true);
443
444      return success ? 0 : 1;
445    }
446
447    @Override
448    protected void handleFailure(Counters counters) throws IOException {
449      try (Connection conn = ConnectionFactory.createConnection(job.getConfiguration())) {
450        TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
451        CounterGroup g = counters.getGroup("undef");
452        Iterator<Counter> it = g.iterator();
453        while (it.hasNext()) {
454          String keyString = it.next().getName();
455          byte[] key = Bytes.toBytes(keyString);
456          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
457          LOG.error("undefined row " + keyString + ", " + loc);
458        }
459        g = counters.getGroup("unref");
460        it = g.iterator();
461        while (it.hasNext()) {
462          String keyString = it.next().getName();
463          byte[] key = Bytes.toBytes(keyString);
464          HRegionLocation loc = conn.getRegionLocator(tableName).getRegionLocation(key, true);
465          LOG.error("unreferred row " + keyString + ", " + loc);
466        }
467      }
468    }
469  }
470
471  static class VisibilityLoop extends Loop {
472    private static final int SLEEP_IN_MS = 5000;
473    private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
474
475    @Override
476    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
477      Integer wrapMultiplier, Integer numWalkers) throws Exception {
478      Path outputPath = new Path(outputDir);
479      UUID uuid = UUID.randomUUID(); // create a random UUID.
480      Path generatorOutput = new Path(outputPath, uuid.toString());
481
482      Generator generator = new VisibilityGenerator();
483      generator.setConf(getConf());
484      int retCode =
485        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
486      if (retCode > 0) {
487        throw new RuntimeException("Generator failed with return code: " + retCode);
488      }
489    }
490
491    protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
492      Integer wrapMultiplier, int tableIndex) throws Exception {
493      LOG.info("Running copier on table "
494        + IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
495      Copier copier = new Copier(
496        IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
497      copier.setConf(getConf());
498      copier.runCopier(outputDir);
499      Thread.sleep(SLEEP_IN_MS);
500    }
501
502    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
503      boolean allTables) throws Exception {
504      Path outputPath = new Path(outputDir);
505
506      if (allTables) {
507        for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
508          LOG.info("Verifying table " + i);
509          sleep(SLEEP_IN_MS);
510          UUID uuid = UUID.randomUUID(); // create a random UUID.
511          Path iterationOutput = new Path(outputPath, uuid.toString());
512          Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
513          verify(numReducers, expectedNumNodes, iterationOutput, verify);
514        }
515      }
516      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
517        runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
518      }
519    }
520
521    private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
522      throws Exception {
523      long temp = expectedNodes;
524      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
525        if (i <= tableIndex) {
526          expectedNodes = 0;
527        } else {
528          expectedNodes = temp;
529        }
530        LOG.info("Verifying data in the table with index " + i + " and expected nodes is "
531          + expectedNodes);
532        runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
533      }
534    }
535
536    private void sleep(long ms) throws InterruptedException {
537      Thread.sleep(ms);
538    }
539
540    protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
541      int index) throws Exception {
542      LOG.info("Verifying common table with index " + index);
543      sleep(SLEEP_IN_MS);
544      Path outputPath = new Path(outputDir);
545      UUID uuid = UUID.randomUUID(); // create a random UUID.
546      Path iterationOutput = new Path(outputPath, uuid.toString());
547      Verify verify =
548        new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(), index);
549      verify(numReducers, expectedNumNodes, iterationOutput, verify);
550    }
551
552    protected void runCopier(String outputDir) throws Exception {
553      for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
554        LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
555        sleep(SLEEP_IN_MS);
556        Copier copier =
557          new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i, false);
558        copier.setConf(getConf());
559        copier.runCopier(outputDir);
560      }
561    }
562
563    private void verify(int numReducers, long expectedNumNodes, Path iterationOutput, Verify verify)
564      throws Exception {
565      verify.setConf(getConf());
566      int retCode = verify.run(iterationOutput, numReducers);
567      if (retCode > 0) {
568        throw new RuntimeException("Verify.run failed with return code: " + retCode);
569      }
570
571      if (!verify.verify(expectedNumNodes)) {
572        throw new RuntimeException("Verify.verify failed");
573      }
574
575      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
576    }
577
578    @Override
579    public int run(String[] args) throws Exception {
580      if (args.length < 5) {
581        System.err.println(
582          "Usage: Loop <num iterations> " + "<num mappers> <num nodes per mapper> <output dir> "
583            + "<num reducers> [<width> <wrap multiplier>]");
584        return 1;
585      }
586      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
587
588      int numIterations = Integer.parseInt(args[0]);
589      int numMappers = Integer.parseInt(args[1]);
590      long numNodes = Long.parseLong(args[2]);
591      String outputDir = args[3];
592      int numReducers = Integer.parseInt(args[4]);
593      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
594      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
595      long expectedNumNodes = 0;
596
597      if (numIterations < 0) {
598        numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
599      }
600
601      for (int i = 0; i < numIterations; i++) {
602        LOG.info("Starting iteration = " + i);
603        LOG.info("Generating data");
604        // By default run no concurrent walkers for test with visibility
605        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
606        expectedNumNodes += numMappers * numNodes;
607        // Copying wont work because expressions are not returned back to the
608        // client
609        LOG.info("Running copier");
610        sleep(SLEEP_IN_MS);
611        runCopier(outputDir);
612        LOG.info("Verifying copied data");
613        sleep(SLEEP_IN_MS);
614        runVerify(outputDir, numReducers, expectedNumNodes, true);
615        sleep(SLEEP_IN_MS);
616        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
617          LOG.info("Deleting data on table with index: " + j);
618          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
619          sleep(SLEEP_IN_MS);
620          LOG.info("Verifying common table after deleting");
621          runVerify(outputDir, numReducers, expectedNumNodes, j);
622          sleep(SLEEP_IN_MS);
623        }
624      }
625      return 0;
626    }
627  }
628
629  @Override
630  @Test
631  public void testContinuousIngest() throws IOException, Exception {
632    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
633    // <num reducers>
634    int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new VisibilityLoop(),
635      new String[] { "1", "1", "20000",
636        util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(), "1",
637        "10000" });
638    org.junit.Assert.assertEquals(0, ret);
639  }
640
641  public static void main(String[] args) throws Exception {
642    Configuration conf = HBaseConfiguration.create();
643    IntegrationTestingUtility.setUseDistributedCluster(conf);
644    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
645    System.exit(ret);
646  }
647
648  @Override
649  protected MonkeyFactory getDefaultMonkeyFactory() {
650    return MonkeyFactory.getFactory(MonkeyFactory.CALM);
651  }
652
653  @Override
654  public int runTestFromCommandLine() throws Exception {
655    return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs);
656  }
657}